BUPTAnderson opened a new issue, #13217:
URL: https://github.com/apache/hudi/issues/13217
**ClusteringOperator read parquet file erro with incompatible types**
**FLINK SQL**
```
DROP TABLE IF EXISTS
stream_catalog.mts_moirai.bl_mts_moirai_audit_center_inspection_hudi_detail;
CREATE TABLE
stream_catalog.mts_moirai.bl_mts_moirai_audit_center_inspection_hudi_detail(
App STRING,
AppId STRING,
business_name STRING,
business_key STRING,
ticket_id STRING,
queue_name STRING,
queue_key STRING,
report_id STRING,
remote_id STRING,
in_time STRING,
audit_time STRING,
monitor_time STRING,
listen_time STRING,
last_auditor_account STRING,
last_audit_time STRING,
auditor_name STRING,
auditor_group STRING,
auditor_id STRING,
audit_action_name STRING,
audit_action_key STRING,
is_spam_feedback INT,
is_security_feedback INT,
is_fraust_feedback INT,
is_review INT,
review_queue STRING,
review_audit_count INT,
review_recheck_time INT,
is_inspection INT,
is_appeal INT,
final_action STRING,
is_test INT,
audit_content STRING,
review_content STRING,
inspection_content STRING,
appeal_content STRING,
event_timestamp TIMESTAMP,
partition_date STRING,
is_inspection_appeal STRING,
is_mark STRING,
PRIMARY KEY (event_timestamp) NOT ENFORCED
),
PARTITIONED BY (partition_date) WITH (
'connector' = 'hudi',
'path' =
'hdfs://router/user/mts_moirai/bl_mts_moirai_audit_center_inspection_hudi_detail',
'table.type' = 'COPY_ON_WRITE',
'hoodie.payload.event.time.field' = 'event_timestamp',
'write.operation' = 'insert',
'write.tasks' = '3',
'clustering.tasks' = '3',
'clustering.schedule.enabled' = 'true',
'clustering.async.enabled' = 'true',
'clustering.delta_commits' = '20',
'hive_sync.enable' = 'true',
'hive_sync.mode' = 'hiveql',
'hive_sync.db' = 'mts_moirai',
'hive_sync.table' = 'bl_mts_moirai_audit_center_inspection_hudi_detail'
);
INSERT INTO
stream_catalog.mts_moirai.bl_mts_moirai_audit_center_inspection_hudi_detail
SELECT
App
,AppId
,business_name
,business_key
,ticket_id
,queue_name
,queue_key
,report_id
,remote_id
,in_time
,audit_time
,monitor_time
,listen_time
,last_auditor_account
,last_audit_time
,auditor_name
,auditor_group
,auditor_group
,auditor_id
,audit_action_name
,audit_action_key
,is_spam_feedback
,is_security_feedback
,is_fraust_feedback
,is_review
,review_queue
,review_audit_count
,review_recheck_time
,is_inspection
,is_appeal
,final_action
,is_test
,audit_content
,review_content
,inspection_content
,appeal_content
,TO_TIMESTAMP(event_time, 'yyyy-MM-dd HH:mm:ss') AS event_timestamp
,date_format(event_time, 'yyyyMMdd') AS partition_date
,is_inspection_appeal AS is_inspection_appeal
,is_mark AS is_mark
FROM (
SELECT
(CASE WHEN TRIM(App) IS NOT NULL THEN TRIM(App) ELSE '' END) AS App
,(CASE WHEN TRIM(AppId) IS NOT NULL THEN TRIM(AppId) ELSE '' END)
AS AppId
,(CASE WHEN TRIM(business_name) IS NOT NULL THEN
TRIM(business_name) ELSE '' END) AS business_name
,(CASE WHEN TRIM(business_key) IS NOT NULL THEN TRIM(business_key)
ELSE '' END) AS business_key
,(CASE WHEN TRIM(ticket_id) IS NOT NULL THEN TRIM(ticket_id) ELSE
'' END) AS ticket_id
,(CASE WHEN TRIM(queue_name) IS NOT NULL THEN TRIM(queue_name)
ELSE '' END) AS queue_name
,(CASE WHEN TRIM(queue_key) IS NOT NULL THEN queue_key ELSE ''
END) AS queue_key
,(CASE WHEN TRIM(report_id) IS NOT NULL THEN report_id ELSE ''
END) AS report_id
,(CASE WHEN TRIM(remote_id) IS NOT NULL THEN remote_id ELSE ''
END) AS remote_id
,(CASE WHEN TRIM(in_time) IS NOT NULL THEN in_time ELSE '' END) AS
in_time
,(CASE WHEN TRIM(audit_time) IS NOT NULL THEN audit_time ELSE ''
END) AS audit_time
,(CASE WHEN TRIM(monitor_time) IS NOT NULL THEN monitor_time ELSE
'' END) AS monitor_time
,(CASE WHEN TRIM(listen_time) IS NOT NULL THEN listen_time ELSE ''
END) AS listen_time
,(CASE WHEN TRIM(last_auditor_account) IS NOT NULL THEN
last_auditor_account ELSE '' END) AS last_auditor_account
,(CASE WHEN TRIM(last_audit_time) IS NOT NULL THEN last_audit_time
ELSE '' END) AS last_audit_time
,(CASE WHEN TRIM(auditor_name) IS NOT NULL THEN auditor_name ELSE
'' END) AS auditor_name
,(CASE WHEN TRIM(auditor_group) IS NOT NULL THEN auditor_group
ELSE '' END) AS auditor_group
,(CASE WHEN TRIM(auditor_id) IS NOT NULL THEN auditor_id ELSE ''
END) AS auditor_id
,(CASE WHEN TRIM(audit_action_name) IS NOT NULL THEN
audit_action_name ELSE '' END) AS audit_action_name
,(CASE WHEN TRIM(audit_action_key) IS NOT NULL THEN
audit_action_key ELSE '' END) AS audit_action_key
,(CASE WHEN is_spam_feedback IS NOT NULL THEN is_spam_feedback
ELSE 0 END) AS is_spam_feedback
,(CASE WHEN is_security_feedback IS NOT NULL THEN
is_security_feedback ELSE 0 END) AS is_security_feedback
,(CASE WHEN is_fraust_feedback IS NOT NULL THEN is_fraust_feedback
ELSE 0 END) AS is_fraust_feedback
,(CASE WHEN is_review IS NOT NULL THEN is_review ELSE 0 END) AS
is_review
,(CASE WHEN TRIM(review_queue) IS NOT NULL THEN review_queue ELSE
'' END) AS review_queue
,(CASE WHEN review_audit_count IS NOT NULL THEN review_audit_count
ELSE 0 END) AS review_audit_count
,(CASE WHEN review_recheck_time IS NOT NULL THEN
review_recheck_time ELSE 0 END) AS review_recheck_time
,(CASE WHEN is_inspection IS NOT NULL THEN is_inspection ELSE 0
END) AS is_inspection
,(CASE WHEN is_appeal IS NOT NULL THEN is_appeal ELSE 0 END) AS
is_appeal
,(CASE WHEN TRIM(final_action) IS NOT NULL THEN final_action ELSE
'' END) AS final_action
,(CASE WHEN is_test IS NOT NULL THEN is_test ELSE 0 END) AS is_test
,(CASE WHEN TRIM(audit_content) IS NOT NULL THEN audit_content
ELSE '' END) AS audit_content
,(CASE WHEN TRIM(review_content) IS NOT NULL THEN review_content
ELSE '' END) AS review_content
,(CASE WHEN TRIM(inspection_content) IS NOT NULL THEN
inspection_content ELSE '' END) AS inspection_content
,(CASE WHEN TRIM(appeal_content) IS NOT NULL THEN appeal_content
ELSE '' END) AS appeal_content
,(CASE WHEN TRIM(event_time) IS NOT NULL THEN event_time ELSE
'2011-01-01 00:00:00' END) AS event_time
,(CASE WHEN is_inspection_appeal IS NOT NULL THEN
is_inspection_appeal ELSE '0' END) AS is_inspection_appeal
,(CASE WHEN is_mark IS NOT NULL THEN is_mark ELSE '0' END) AS
is_mark
FROM stream_catalog.mts_moirai.stl_audit_center_inspection_kafka_log
) inspection_log
;
```
**ERROR LOG**
'''
2025-04-24 12:34:58,857 ERROR
org.apache.hudi.sink.clustering.ClusteringOperator [] - Executor
executes action [Execute clustering for instant 20241226135141790 from task 2]
error
org.apache.hudi.exception.HoodieException: unable to read next record from
parquet file
at
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
~[hudi-flink1.14-bundle-0.13.0.1.jar:0.13.0.1]
at
org.apache.hudi.common.util.MappingIterator.hasNext(MappingIterator.java:35)
~[hudi-flink1.14-bundle-0.13.0.1.jar:0.13.0.1]
at
org.apache.hudi.common.util.MappingIterator.hasNext(MappingIterator.java:35)
~[hudi-flink1.14-bundle-0.13.0.1.jar:0.13.0.1]
at
java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811)
~[?:1.8.0_121]
at
java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:294)
~[?:1.8.0_121]
at
java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206)
~[?:1.8.0_121]
at
java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:161)
~[?:1.8.0_121]
at
java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:300)
~[?:1.8.0_121]
at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
~[?:1.8.0_121]
at
org.apache.hudi.client.utils.ConcatenatingIterator.hasNext(ConcatenatingIterator.java:45)
~[hudi-flink1.14-bundle-0.13.0.1.jar:0.13.0.1]
at
org.apache.hudi.sink.clustering.ClusteringOperator.doClustering(ClusteringOperator.java:261)
~[hudi-flink1.14-bundle-0.13.0.1.jar:0.13.0.1]
at
org.apache.hudi.sink.clustering.ClusteringOperator.lambda$processElement$0(ClusteringOperator.java:194)
~[hudi-flink1.14-bundle-0.13.0.1.jar:0.13.0.1]
at
org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130)
~[hudi-flink1.14-bundle-0.13.0.1.jar:0.13.0.1]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[?:1.8.0_121]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[?:1.8.0_121]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read
value at 0 in block -1 in file
hdfs://router/user/mts_moirai/bl_mts_moirai_audit_center_inspection_hudi_detail/20241226/d3badaf6-c17e-4389-bf71-261222a59f68-0_1-3-0_20241226133152412.parquet
at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:255)
~[flink-sql-parquet_2.11-1.14.3.jar:1.14.3]
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
~[flink-sql-parquet_2.11-1.14.3.jar:1.14.3]
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
~[flink-sql-parquet_2.11-1.14.3.jar:1.14.3]
at
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
~[hudi-flink1.14-bundle-0.13.0.1.jar:0.13.0.1]
... 15 more
Caused by: org.apache.parquet.io.ParquetDecodingException: The requested
schema is not compatible with the file schema. incompatible types: required
int64 event_timestamp (TIMESTAMP(MICROS,true)) != required int96 event_timestamp
at
org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.incompatibleSchema(ColumnIOFactory.java:101)
~[flink-sql-parquet_2.11-1.14.3.jar:1.14.3]
at
org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:93)
~[flink-sql-parquet_2.11-1.14.3.jar:1.14.3]
at
org.apache.parquet.schema.PrimitiveType.accept(PrimitiveType.java:596)
~[flink-sql-parquet_2.11-1.14.3.jar:1.14.3]
at
org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visitChildren(ColumnIOFactory.java:83)
~[flink-sql-parquet_2.11-1.14.3.jar:1.14.3]
at
org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:57)
~[flink-sql-parquet_2.11-1.14.3.jar:1.14.3]
at org.apache.parquet.schema.MessageType.accept(MessageType.java:55)
~[flink-sql-parquet_2.11-1.14.3.jar:1.14.3]
at
org.apache.parquet.io.ColumnIOFactory.getColumnIO(ColumnIOFactory.java:162)
~[flink-sql-parquet_2.11-1.14.3.jar:1.14.3]
at
org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:136)
~[flink-sql-parquet_2.11-1.14.3.jar:1.14.3]
at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:226)
~[flink-sql-parquet_2.11-1.14.3.jar:1.14.3]
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
~[flink-sql-parquet_2.11-1.14.3.jar:1.14.3]
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
~[flink-sql-parquet_2.11-1.14.3.jar:1.14.3]
at
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
~[hudi-flink1.14-bundle-0.13.0.1.jar:0.13.0.1]
... 15 more
'''
flink version : 1.14.3
hudi version: 0.13.0.1
Due to this error, the merge operation could not be executed and a very
large number of files have been generated. Is there a solution?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]