seekforshell opened a new issue, #10012:
URL: https://github.com/apache/hudi/issues/10012
Describe the problem you faced
Spark read invalid timestamp(3) data when record in log is older than the
same in parquet.
To Reproduce
1. create a mor table with timestamp(3) type.
eg.
CREATE EXTERNAL TABLE `xxx.bucket_mor_t2`(
`_hoodie_commit_time` string COMMENT '',
`_hoodie_commit_seqno` string COMMENT '',
`_hoodie_record_key` string COMMENT '',
`_hoodie_partition_path` string COMMENT '',
`_hoodie_file_name` string COMMENT '',
`source_from` int COMMENT '',
`id` bigint COMMENT '',
`name` string COMMENT '',
`create_time` timestamp COMMENT '',
`price` decimal(14,2) COMMENT '',
`extend` string COMMENT '',
`count` bigint COMMENT '',
`create_date` date COMMENT '',
`ext_dt` timestamp COMMENT '',
`precombine_field` string COMMENT '',
`sync_deleted` int COMMENT '',
`sync_time` timestamp COMMENT '',
`__binlog_file` string COMMENT '',
`__pos` int COMMENT '',
`source_sys` int COMMENT '')
PARTITIONED BY (
`__partition_field` int COMMENT '')
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH SERDEPROPERTIES (
'hoodie.query.as.ro.table'='false',
'path'='hdfs://NameNodeService1/xxx/xxx/bucket_mor_t2')
STORED AS INPUTFORMAT
'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
'hdfs://NameNodeService1/xxx/xxx/bucket_mor_t2'
TBLPROPERTIES (
'connector'='hudi',
'hoodie.datasource.write.recordkey.field'='source_from,id',
'last_commit_time_sync'='20231106172508127',
'path'='hdfs://NameNodeService1/xxx/xxx/bucket_mor_t2',
'spark.sql.sources.provider'='hudi',
'spark.sql.sources.schema.numPartCols'='1',
'spark.sql.sources.schema.numParts'='1',
'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"source_from","type":"integer","nullable":true,"metadata":{}},{"name":"id","type":"long","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"create_time","type":"timestamp","nullable":true,"metadata":{}},{"name":"price","type":"decimal(14,2)","nullable":true,"metadata":{}},{"name":"extend","type":"string","nullable":true,"metadata":{}},{"name":"count","type":"long","nullable":true,"metadata":{}},{"name":"create_date","type":"date","nullable":true,"metadata":{}},{"name":"ext_dt","ty
pe":"timestamp","nullable":true,"metadata":{}},{"name":"precombine_field","type":"string","nullable":true,"metadata":{}},{"name":"sync_deleted","type":"integer","nullable":true,"metadata":{}},{"name":"sync_time","type":"timestamp","nullable":true,"metadata":{}},{"name":"__binlog_file","type":"string","nullable":true,"metadata":{}},{"name":"__pos","type":"integer","nullable":true,"metadata":{}},{"name":"source_sys","type":"integer","nullable":true,"metadata":{}},{"name":"__partition_field","type":"integer","nullable":true,"metadata":{}}]}',
'spark.sql.sources.schema.partCol.0'='__partition_field',
'table.type'='MERGE_ON_READ',
'transient_lastDdlTime'='1692251328')
2. insert new data into parquet with flink engine. eg. insert a record(id=1)
with precombine value = 000001308800000028038927500000
3. mock binlog(same record in step2) with precombine value = 1 (which is
smaller than before) and commit but don't do compaction
finally, read record(id=1) in snapthot mode with spark sql. invalid data
will occur:

Expected behavior
when read field with timestamp(3), it will occur invalid data.
Environment Description
- Hudi version : 0.12.1
- Spark version : 3.1.1
Additional context
here is some debug info:
create_time in parquet is micros


avro log is millis

Stacktrace
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]