parisni opened a new issue, #8222:
URL: https://github.com/apache/hudi/issues/8222

   tested on 0.12 and 0.13 with spark 3.2.1
   
   COW and MOR tables does not return same result when dealing with incremental 
read. Here is a reproducible source code + output for both.
   
   For the last commit,
   MOR returns:
   ```
   
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
   _hoodie_file_name|uuid| ts|part|
   
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   |  20230317202718414|20230317202718414...|                 a|                
   foo|20c4c9f3-2cbc-439...|   a|  9| foo|
   
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   ```
   while COW returns :
   ```
   
+-------------------+--------------------+------------------+----------------------+-----------------+----+---+----+
   
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name|uuid|
 ts|part|
   
+-------------------+--------------------+------------------+----------------------+-----------------+----+---+----+
   
+-------------------+--------------------+------------------+----------------------+-----------------+----+---+----+
   ```
   COW is right, while MOR is wrong, because the combine key for the last 
commit is lower than existing record and should not be shown.
   
   ```python
   tableName = "test_hudi_pyspark_local"
   basePath = f"/tmp/{tableName}"
   
   df = spark.sql("select 'a' as uuid, 10 as ts, 'foo' as part")
   #mode="COPY_ON_WRITE"
   mode="MERGE_ON_READ"
   hudi_options = {
       "hoodie.table.name": tableName,
       "hoodie.datasource.write.table.type": mode,
       "hoodie.datasource.write.payload.class": 
"org.apache.hudi.common.model.DefaultHoodieRecordPayload",
       "hoodie.datasource.write.recordkey.field": "uuid",
       "hoodie.datasource.write.partitionpath.field": "part",
       "hoodie.datasource.write.table.name": tableName,
       "hoodie.datasource.write.operation": "upsert",
       "hoodie.datasource.write.precombine.field": "ts",
       "hoodie.upsert.shuffle.parallelism": 2,
       "hoodie.insert.shuffle.parallelism": 2,
       "hoodie.datasource.hive_sync.enable": "false",
       "hoodie.datasource.hive_sync.partition_fields": "part",
       "hoodie.datasource.hive_sync.partition_extractor_class": 
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
   }
   
(df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath))
   spark.read.format("hudi").load(basePath).show()
   
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
   _hoodie_file_name|uuid| ts|part|
   
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   |  20230317202715405|20230317202715405...|                 a|                
   foo|20c4c9f3-2cbc-439...|   a| 10| foo|
   
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   
   
   df = spark.sql("select 'a' as uuid, 11 as ts, 'foo' as part")
   
(df.write.format("hudi").options(**hudi_options).mode("append").save(basePath))
   spark.read.format("hudi").load(basePath).show()
   
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
   _hoodie_file_name|uuid| ts|part|
   
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   |  20230317202717144|20230317202717144...|                 a|                
   foo|20c4c9f3-2cbc-439...|   a| 11| foo|
   
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   
   
   df = spark.sql("select 'a' as uuid, 9 as ts, 'foo' as part")
   
(df.write.format("hudi").options(**hudi_options).mode("append").save(basePath))
   spark.read.format("hudi").load(basePath).show()
   
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
   _hoodie_file_name|uuid| ts|part|
   
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   |  20230317202717144|20230317202717144...|                 a|                
   foo|20c4c9f3-2cbc-439...|   a| 11| foo|
   
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   
   
   
   # look into the timeline to get the latest commit
   # then read incremental
   # $ ll /tmp/test_hudi_pyspark_local/.hoodie/
   #  .schema
   #  archived
   #  .aux
   #  20230317202715405.deltacommit.requested
   #  .20230317202715405.deltacommit.requested.crc
   #  metadata
   #  hoodie.properties
   #  .hoodie.properties.crc
   #  20230317202715405.deltacommit.inflight
   #  .20230317202715405.deltacommit.inflight.crc
   #  20230317202715405.deltacommit
   #  .20230317202715405.deltacommit.crc
   #  20230317202717144.deltacommit.requested
   #  .20230317202717144.deltacommit.requested.crc
   #  20230317202717144.deltacommit.inflight
   #  .20230317202717144.deltacommit.inflight.crc
   #  .20230317202717144.deltacommit.crc
   #  20230317202717144.deltacommit
   #  20230317202718414.deltacommit.requested
   #  .20230317202718414.deltacommit.requested.crc
   #  .20230317202718414.deltacommit.inflight.crc
   #  20230317202718414.deltacommit.inflight
   #  .20230317202718414.deltacommit.crc
   #  20230317202718414.deltacommit
   #  .temp
   last_commit=20230317202718414
   hudi_incremental = {
       "hoodie.table.name": tableName,
       "hoodie.datasource.write.table.type": mode,
       "hoodie.datasource.write.payload.class": 
"org.apache.hudi.common.model.DefaultHoodieRecordPayload",
       "hoodie.datasource.write.recordkey.field": "uuid",
       "hoodie.datasource.write.partitionpath.field": "part",
       "hoodie.datasource.write.table.name": tableName,
       "hoodie.datasource.write.operation": "upsert",
       "hoodie.datasource.write.precombine.field": "ts",
       "hoodie.upsert.shuffle.parallelism": 2,
       "hoodie.insert.shuffle.parallelism": 2,
       "hoodie.datasource.hive_sync.enable": "false",
       "hoodie.datasource.query.type": "incremental",
       "hoodie.datasource.read.begin.instanttime": str(last_commit -1),
       "hoodie.datasource.read.end.instanttime": str(last_commit),
   }
   spark.read.format("hudi").options(**hudi_incremental).load(basePath).show()
   
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
   _hoodie_file_name|uuid| ts|part|
   
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   |  20230317202718414|20230317202718414...|                 a|                
   foo|20c4c9f3-2cbc-439...|   a|  9| foo|
   
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   
   ```
   
   ```
   sc.setLogLevel("WARN")
   
   tableName = "test_hudi_pyspark_local"
   basePath = f"/tmp/{tableName}"
   
   df = spark.sql("select 'a' as uuid, 10 as ts, 'foo' as part")
   mode="COPY_ON_WRITE"
   #mode="MERGE_ON_READ"
   hudi_options = {
       "hoodie.table.name": tableName,
       "hoodie.datasource.write.table.type": mode,
       "hoodie.datasource.write.payload.class": 
"org.apache.hudi.common.model.DefaultHoodieRecordPayload",
       "hoodie.datasource.write.recordkey.field": "uuid",
       "hoodie.datasource.write.partitionpath.field": "part",
       "hoodie.datasource.write.table.name": tableName,
       "hoodie.datasource.write.operation": "upsert",
       "hoodie.datasource.write.precombine.field": "ts",
       "hoodie.upsert.shuffle.parallelism": 2,
       "hoodie.insert.shuffle.parallelism": 2,
       "hoodie.datasource.hive_sync.enable": "false",
       "hoodie.datasource.hive_sync.partition_fields": "part",
       "hoodie.datasource.hive_sync.partition_extractor_class": 
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
   }
   
(df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath))
   spark.read.format("hudi").load(basePath).show()
   
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
   _hoodie_file_name|uuid| ts|part|
   
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   |  20230317203659919|20230317203659919...|                 a|                
   foo|e90f346f-d878-439...|   a| 10| foo|
   
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   
   df = spark.sql("select 'a' as uuid, 11 as ts, 'foo' as part")
   
(df.write.format("hudi").options(**hudi_options).mode("append").save(basePath))
   spark.read.format("hudi").load(basePath).show()
   
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
   _hoodie_file_name|uuid| ts|part|
   
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   |  20230317203701577|20230317203701577...|                 a|                
   foo|e90f346f-d878-439...|   a| 11| foo|
   
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   
   df = spark.sql("select 'a' as uuid, 9 as ts, 'foo' as part")
   
(df.write.format("hudi").options(**hudi_options).mode("append").save(basePath))
   spark.read.format("hudi").load(basePath).show()
   
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
   _hoodie_file_name|uuid| ts|part|
   
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   |  20230317203701577|20230317203701577...|                 a|                
   foo|e90f346f-d878-439...|   a| 11| foo|
   
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
   
   # look into the timeline to get the latest commit
   # then read incremental
   # $ ll /tmp/test_hudi_pyspark_local/.hoodie/
   # .schema
   # .aux
   # archived
   # 20230317203659919.commit.requested
   # .20230317203659919.commit.requested.crc
   # metadata
   # hoodie.properties
   # .hoodie.properties.crc
   # .20230317203659919.inflight.crc
   # 20230317203659919.inflight
   # .20230317203659919.commit.crc
   # 20230317203659919.commit
   # 20230317203701577.commit.requested
   # .20230317203701577.commit.requested.crc
   # .20230317203701577.inflight.crc
   # 20230317203701577.inflight
   # .20230317203701577.commit.crc
   # 20230317203701577.commit
   # 20230317203702776.commit.requested
   # .20230317203702776.commit.requested.crc
   # .20230317203702776.inflight.crc
   # 20230317203702776.inflight
   # .20230317203702776.commit.crc
   # 20230317203702776.commit
   last_commit=20230317203702776
   hudi_incremental = {
       "hoodie.table.name": tableName,
       "hoodie.datasource.write.table.type": mode,
       "hoodie.datasource.write.payload.class": 
"org.apache.hudi.common.model.DefaultHoodieRecordPayload",
       "hoodie.datasource.write.recordkey.field": "uuid",
       "hoodie.datasource.write.partitionpath.field": "part",
       "hoodie.datasource.write.table.name": tableName,
       "hoodie.datasource.write.operation": "upsert",
       "hoodie.datasource.write.precombine.field": "ts",
       "hoodie.upsert.shuffle.parallelism": 2,
       "hoodie.insert.shuffle.parallelism": 2,
       "hoodie.datasource.hive_sync.enable": "false",
       "hoodie.datasource.query.type": "incremental",
       "hoodie.datasource.read.begin.instanttime": str(last_commit -1),
       "hoodie.datasource.read.end.instanttime": str(last_commit),
   }
   spark.read.format("hudi").options(**hudi_incremental).load(basePath).show()
   
+-------------------+--------------------+------------------+----------------------+-----------------+----+---+----+
   
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name|uuid|
 ts|part|
   
+-------------------+--------------------+------------------+----------------------+-----------------+----+---+----+
   
+-------------------+--------------------+------------------+----------------------+-----------------+----+---+----+
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to