seekforshell opened a new issue, #10012:
URL: https://github.com/apache/hudi/issues/10012

   
   Describe the problem you faced
   
   Spark read invalid timestamp(3) data when record in log is older than the 
same in parquet. 
   
   To Reproduce
   
   1. create a mor table with timestamp(3) type. 
   eg.
    CREATE EXTERNAL TABLE `xxx.bucket_mor_t2`( 
      `_hoodie_commit_time` string COMMENT '',         
      `_hoodie_commit_seqno` string COMMENT '',        
      `_hoodie_record_key` string COMMENT '',          
      `_hoodie_partition_path` string COMMENT '',      
      `_hoodie_file_name` string COMMENT '',           
      `source_from` int COMMENT '',                    
      `id` bigint COMMENT '',                          
      `name` string COMMENT '',                        
      `create_time` timestamp COMMENT '',              
      `price` decimal(14,2) COMMENT '',                
      `extend` string COMMENT '',                      
      `count` bigint COMMENT '',                       
      `create_date` date COMMENT '',                   
      `ext_dt` timestamp COMMENT '',                   
      `precombine_field` string COMMENT '',            
      `sync_deleted` int COMMENT '',                   
      `sync_time` timestamp COMMENT '',                
      `__binlog_file` string COMMENT '',               
      `__pos` int COMMENT '',                          
      `source_sys` int COMMENT '')                     
    PARTITIONED BY (                                   
      `__partition_field` int COMMENT '')              
    ROW FORMAT SERDE                                   
      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  
    WITH SERDEPROPERTIES (                             
      'hoodie.query.as.ro.table'='false',              
      'path'='hdfs://NameNodeService1/xxx/xxx/bucket_mor_t2')  
    STORED AS INPUTFORMAT                              
      'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat'  
    OUTPUTFORMAT                                       
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
    LOCATION                                           
      'hdfs://NameNodeService1/xxx/xxx/bucket_mor_t2' 
    TBLPROPERTIES (                                    
      'connector'='hudi',                              
      'hoodie.datasource.write.recordkey.field'='source_from,id',  
      'last_commit_time_sync'='20231106172508127',     
      'path'='hdfs://NameNodeService1/xxx/xxx/bucket_mor_t2',  
      'spark.sql.sources.provider'='hudi',             
      'spark.sql.sources.schema.numPartCols'='1',      
      'spark.sql.sources.schema.numParts'='1',         
      
'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"source_from","type":"integer","nullable":true,"metadata":{}},{"name":"id","type":"long","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"create_time","type":"timestamp","nullable":true,"metadata":{}},{"name":"price","type":"decimal(14,2)","nullable":true,"metadata":{}},{"name":"extend","type":"string","nullable":true,"metadata":{}},{"name":"count","type":"long","nullable":true,"metadata":{}},{"name":"create_date","type":"date","nullable":true,"metadata":{}},{"name":"ext_dt","ty
 
pe":"timestamp","nullable":true,"metadata":{}},{"name":"precombine_field","type":"string","nullable":true,"metadata":{}},{"name":"sync_deleted","type":"integer","nullable":true,"metadata":{}},{"name":"sync_time","type":"timestamp","nullable":true,"metadata":{}},{"name":"__binlog_file","type":"string","nullable":true,"metadata":{}},{"name":"__pos","type":"integer","nullable":true,"metadata":{}},{"name":"source_sys","type":"integer","nullable":true,"metadata":{}},{"name":"__partition_field","type":"integer","nullable":true,"metadata":{}}]}',
  
      'spark.sql.sources.schema.partCol.0'='__partition_field',  
      'table.type'='MERGE_ON_READ',                    
      'transient_lastDdlTime'='1692251328')            
   
   2. insert new data into parquet with flink engine. eg. insert a record(id=1) 
with precombine value = 000001308800000028038927500000
   
   3. mock binlog(same record in step2) with precombine value = 1 (which is 
smaller than before) and commit but don't do compaction
   
   finally, read record(id=1) in snapthot mode with spark sql. invalid data 
will occur:
   
   
![b6c3e286dd36ef29f47f6ec569983e82](https://github.com/apache/hudi/assets/8132965/06d3a4b5-ae06-4387-9b2a-0e6b12127e2a)
   
   
   Expected behavior
   
   when read field with timestamp(3), it will occur invalid data.
   
   Environment Description
   
   -  Hudi version : 0.12.1
   - Spark version : 3.1.1
   
   Additional context
   here is some debug info:
   create_time in parquet is micros
   
![1699411557224_E98295A7-92FC-45c1-996E-E88960A58F1F](https://github.com/apache/hudi/assets/8132965/9d7908a4-7a4b-46ec-9c5e-ddc8c888edce)
   
   
![41683b8bd67a4bd2e139fad1d11c7594](https://github.com/apache/hudi/assets/8132965/bb6e90a9-c4b4-448b-83d2-d093f00a5f9c)
   avro log is millis
   
![0e9e436e5ebe71463d39c7a50c290f30](https://github.com/apache/hudi/assets/8132965/649f1564-ebc9-48fa-acc6-8daf18c99e51)
   
   Stacktrace
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to