BUPTAnderson opened a new issue, #13217:
URL: https://github.com/apache/hudi/issues/13217

   **ClusteringOperator read parquet file erro with incompatible types**
   
   **FLINK SQL**
   ```
   DROP TABLE IF EXISTS 
stream_catalog.mts_moirai.bl_mts_moirai_audit_center_inspection_hudi_detail;
   CREATE TABLE 
stream_catalog.mts_moirai.bl_mts_moirai_audit_center_inspection_hudi_detail(
        App STRING,
        AppId STRING,
        business_name STRING,
        business_key STRING,
        ticket_id STRING,
        queue_name STRING,
        queue_key STRING,
        report_id STRING,
        remote_id STRING,
        in_time STRING,
        audit_time STRING,
        monitor_time STRING,
        listen_time STRING,
        last_auditor_account STRING,
        last_audit_time STRING,
        auditor_name STRING,
        auditor_group STRING,
        auditor_id STRING,
        audit_action_name STRING,
        audit_action_key STRING,
        is_spam_feedback INT,
        is_security_feedback INT,
        is_fraust_feedback INT,
        is_review INT,
        review_queue STRING,
        review_audit_count INT,
        review_recheck_time INT,
        is_inspection INT,
        is_appeal INT,
        final_action STRING,
        is_test INT,
        audit_content STRING,
        review_content STRING,
        inspection_content STRING,
        appeal_content STRING,
        event_timestamp TIMESTAMP,
        partition_date STRING,
        is_inspection_appeal STRING,
        is_mark STRING,
        PRIMARY KEY (event_timestamp) NOT ENFORCED
   ),
   PARTITIONED BY (partition_date) WITH (
       'connector' = 'hudi',
       'path' = 
'hdfs://router/user/mts_moirai/bl_mts_moirai_audit_center_inspection_hudi_detail',
       'table.type' = 'COPY_ON_WRITE',
       'hoodie.payload.event.time.field' = 'event_timestamp',
       'write.operation' = 'insert',
       'write.tasks' = '3',
       'clustering.tasks' = '3',
       'clustering.schedule.enabled' = 'true',
       'clustering.async.enabled' = 'true',
       'clustering.delta_commits' = '20',
       'hive_sync.enable' = 'true',
       'hive_sync.mode' = 'hiveql',
       'hive_sync.db' = 'mts_moirai',
       'hive_sync.table' = 'bl_mts_moirai_audit_center_inspection_hudi_detail'
   );
   
   
   INSERT INTO 
stream_catalog.mts_moirai.bl_mts_moirai_audit_center_inspection_hudi_detail
   SELECT
       App
      ,AppId
      ,business_name
      ,business_key
      ,ticket_id
      ,queue_name
      ,queue_key
      ,report_id
      ,remote_id
      ,in_time
      ,audit_time
      ,monitor_time
      ,listen_time
      ,last_auditor_account
      ,last_audit_time
      ,auditor_name
      ,auditor_group
      ,auditor_group
      ,auditor_id
      ,audit_action_name
      ,audit_action_key
      ,is_spam_feedback
      ,is_security_feedback
      ,is_fraust_feedback
      ,is_review
      ,review_queue
      ,review_audit_count
      ,review_recheck_time
      ,is_inspection
      ,is_appeal
      ,final_action
      ,is_test
      ,audit_content
      ,review_content
      ,inspection_content
      ,appeal_content
      ,TO_TIMESTAMP(event_time, 'yyyy-MM-dd HH:mm:ss') AS event_timestamp
      ,date_format(event_time, 'yyyyMMdd') AS partition_date
      ,is_inspection_appeal AS is_inspection_appeal
      ,is_mark AS is_mark
       FROM (
        SELECT
            (CASE WHEN TRIM(App) IS NOT NULL THEN TRIM(App) ELSE '' END) AS App
             ,(CASE WHEN TRIM(AppId) IS NOT NULL THEN TRIM(AppId) ELSE '' END) 
AS AppId
             ,(CASE WHEN TRIM(business_name) IS NOT NULL THEN 
TRIM(business_name) ELSE '' END) AS business_name
             ,(CASE WHEN TRIM(business_key) IS NOT NULL THEN TRIM(business_key) 
ELSE '' END) AS business_key
             ,(CASE WHEN TRIM(ticket_id) IS NOT NULL THEN TRIM(ticket_id) ELSE 
'' END) AS ticket_id
             ,(CASE WHEN TRIM(queue_name) IS NOT NULL THEN TRIM(queue_name) 
ELSE '' END) AS queue_name
             ,(CASE WHEN TRIM(queue_key) IS NOT NULL THEN queue_key ELSE '' 
END) AS queue_key
             ,(CASE WHEN TRIM(report_id) IS NOT NULL THEN report_id ELSE '' 
END) AS report_id
             ,(CASE WHEN TRIM(remote_id) IS NOT NULL THEN remote_id ELSE '' 
END) AS remote_id
             ,(CASE WHEN TRIM(in_time) IS NOT NULL THEN in_time ELSE '' END) AS 
in_time
             ,(CASE WHEN TRIM(audit_time) IS NOT NULL THEN audit_time ELSE '' 
END) AS audit_time
             ,(CASE WHEN TRIM(monitor_time) IS NOT NULL THEN monitor_time ELSE 
'' END) AS monitor_time
             ,(CASE WHEN TRIM(listen_time) IS NOT NULL THEN listen_time ELSE '' 
END) AS listen_time
             ,(CASE WHEN TRIM(last_auditor_account) IS NOT NULL THEN 
last_auditor_account ELSE '' END) AS last_auditor_account
             ,(CASE WHEN TRIM(last_audit_time) IS NOT NULL THEN last_audit_time 
ELSE '' END) AS last_audit_time
             ,(CASE WHEN TRIM(auditor_name) IS NOT NULL THEN auditor_name ELSE 
'' END) AS auditor_name
             ,(CASE WHEN TRIM(auditor_group) IS NOT NULL THEN auditor_group 
ELSE '' END) AS auditor_group
             ,(CASE WHEN TRIM(auditor_id) IS NOT NULL THEN auditor_id ELSE '' 
END) AS auditor_id
             ,(CASE WHEN TRIM(audit_action_name) IS NOT NULL THEN 
audit_action_name ELSE '' END) AS audit_action_name
             ,(CASE WHEN TRIM(audit_action_key) IS NOT NULL THEN 
audit_action_key ELSE '' END) AS audit_action_key
             ,(CASE WHEN is_spam_feedback IS NOT NULL THEN is_spam_feedback 
ELSE 0 END) AS is_spam_feedback
             ,(CASE WHEN is_security_feedback IS NOT NULL THEN 
is_security_feedback ELSE 0 END) AS is_security_feedback
             ,(CASE WHEN is_fraust_feedback IS NOT NULL THEN is_fraust_feedback 
ELSE 0 END) AS is_fraust_feedback
             ,(CASE WHEN is_review IS NOT NULL THEN is_review ELSE 0 END) AS 
is_review
             ,(CASE WHEN TRIM(review_queue) IS NOT NULL THEN review_queue ELSE 
'' END) AS review_queue
             ,(CASE WHEN review_audit_count IS NOT NULL THEN review_audit_count 
ELSE 0 END) AS review_audit_count
             ,(CASE WHEN review_recheck_time IS NOT NULL THEN 
review_recheck_time ELSE 0 END) AS review_recheck_time
             ,(CASE WHEN is_inspection IS NOT NULL THEN is_inspection ELSE 0 
END) AS is_inspection
             ,(CASE WHEN is_appeal IS NOT NULL THEN is_appeal ELSE 0 END) AS 
is_appeal
             ,(CASE WHEN TRIM(final_action) IS NOT NULL THEN final_action ELSE 
'' END) AS final_action
             ,(CASE WHEN is_test IS NOT NULL THEN is_test ELSE 0 END) AS is_test
             ,(CASE WHEN TRIM(audit_content) IS NOT NULL THEN audit_content 
ELSE '' END) AS audit_content
             ,(CASE WHEN TRIM(review_content) IS NOT NULL THEN review_content 
ELSE '' END) AS review_content
             ,(CASE WHEN TRIM(inspection_content) IS NOT NULL THEN 
inspection_content ELSE '' END) AS inspection_content
             ,(CASE WHEN TRIM(appeal_content) IS NOT NULL THEN appeal_content 
ELSE '' END) AS appeal_content
             ,(CASE WHEN TRIM(event_time) IS NOT NULL THEN event_time ELSE 
'2011-01-01 00:00:00' END) AS event_time
             ,(CASE WHEN is_inspection_appeal IS NOT NULL THEN 
is_inspection_appeal ELSE '0' END) AS is_inspection_appeal
                  ,(CASE WHEN is_mark IS NOT NULL THEN is_mark ELSE '0' END) AS 
is_mark
        FROM stream_catalog.mts_moirai.stl_audit_center_inspection_kafka_log
                 ) inspection_log
   ;
   ```
   
   **ERROR LOG**
   '''
   2025-04-24 12:34:58,857 ERROR 
org.apache.hudi.sink.clustering.ClusteringOperator           [] - Executor 
executes action [Execute clustering for instant 20241226135141790 from task 2] 
error
   org.apache.hudi.exception.HoodieException: unable to read next record from 
parquet file
        at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
 ~[hudi-flink1.14-bundle-0.13.0.1.jar:0.13.0.1]
        at 
org.apache.hudi.common.util.MappingIterator.hasNext(MappingIterator.java:35) 
~[hudi-flink1.14-bundle-0.13.0.1.jar:0.13.0.1]
        at 
org.apache.hudi.common.util.MappingIterator.hasNext(MappingIterator.java:35) 
~[hudi-flink1.14-bundle-0.13.0.1.jar:0.13.0.1]
        at 
java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811) 
~[?:1.8.0_121]
        at 
java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:294)
 ~[?:1.8.0_121]
        at 
java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206)
 ~[?:1.8.0_121]
        at 
java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:161)
 ~[?:1.8.0_121]
        at 
java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:300)
 ~[?:1.8.0_121]
        at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681) 
~[?:1.8.0_121]
        at 
org.apache.hudi.client.utils.ConcatenatingIterator.hasNext(ConcatenatingIterator.java:45)
 ~[hudi-flink1.14-bundle-0.13.0.1.jar:0.13.0.1]
        at 
org.apache.hudi.sink.clustering.ClusteringOperator.doClustering(ClusteringOperator.java:261)
 ~[hudi-flink1.14-bundle-0.13.0.1.jar:0.13.0.1]
        at 
org.apache.hudi.sink.clustering.ClusteringOperator.lambda$processElement$0(ClusteringOperator.java:194)
 ~[hudi-flink1.14-bundle-0.13.0.1.jar:0.13.0.1]
        at 
org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130)
 ~[hudi-flink1.14-bundle-0.13.0.1.jar:0.13.0.1]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
[?:1.8.0_121]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
[?:1.8.0_121]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
   Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read 
value at 0 in block -1 in file 
hdfs://router/user/mts_moirai/bl_mts_moirai_audit_center_inspection_hudi_detail/20241226/d3badaf6-c17e-4389-bf71-261222a59f68-0_1-3-0_20241226133152412.parquet
        at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:255)
 ~[flink-sql-parquet_2.11-1.14.3.jar:1.14.3]
        at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132) 
~[flink-sql-parquet_2.11-1.14.3.jar:1.14.3]
        at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136) 
~[flink-sql-parquet_2.11-1.14.3.jar:1.14.3]
        at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
 ~[hudi-flink1.14-bundle-0.13.0.1.jar:0.13.0.1]
        ... 15 more
   Caused by: org.apache.parquet.io.ParquetDecodingException: The requested 
schema is not compatible with the file schema. incompatible types: required 
int64 event_timestamp (TIMESTAMP(MICROS,true)) != required int96 event_timestamp
        at 
org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.incompatibleSchema(ColumnIOFactory.java:101)
 ~[flink-sql-parquet_2.11-1.14.3.jar:1.14.3]
        at 
org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:93)
 ~[flink-sql-parquet_2.11-1.14.3.jar:1.14.3]
        at 
org.apache.parquet.schema.PrimitiveType.accept(PrimitiveType.java:596) 
~[flink-sql-parquet_2.11-1.14.3.jar:1.14.3]
        at 
org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visitChildren(ColumnIOFactory.java:83)
 ~[flink-sql-parquet_2.11-1.14.3.jar:1.14.3]
        at 
org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:57)
 ~[flink-sql-parquet_2.11-1.14.3.jar:1.14.3]
        at org.apache.parquet.schema.MessageType.accept(MessageType.java:55) 
~[flink-sql-parquet_2.11-1.14.3.jar:1.14.3]
        at 
org.apache.parquet.io.ColumnIOFactory.getColumnIO(ColumnIOFactory.java:162) 
~[flink-sql-parquet_2.11-1.14.3.jar:1.14.3]
        at 
org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:136)
 ~[flink-sql-parquet_2.11-1.14.3.jar:1.14.3]
        at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:226)
 ~[flink-sql-parquet_2.11-1.14.3.jar:1.14.3]
        at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132) 
~[flink-sql-parquet_2.11-1.14.3.jar:1.14.3]
        at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136) 
~[flink-sql-parquet_2.11-1.14.3.jar:1.14.3]
        at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
 ~[hudi-flink1.14-bundle-0.13.0.1.jar:0.13.0.1]
        ... 15 more
   '''
   
   flink version : 1.14.3
   hudi version: 0.13.0.1
   
   
   
   Due to this error, the merge operation could not be executed and a very 
large number of files have been generated. Is there a solution?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to