parisni opened a new issue, #8222:
URL: https://github.com/apache/hudi/issues/8222
tested on 0.12 and 0.13 with spark 3.2.1
COW and MOR tables does not return same result when dealing with incremental
read. Here is a reproducible source code + output for both.
For the last commit,
MOR returns:
```
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name|uuid| ts|part|
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
| 20230317202718414|20230317202718414...| a|
foo|20c4c9f3-2cbc-439...| a| 9| foo|
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
```
while COW returns :
```
+-------------------+--------------------+------------------+----------------------+-----------------+----+---+----+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name|uuid|
ts|part|
+-------------------+--------------------+------------------+----------------------+-----------------+----+---+----+
+-------------------+--------------------+------------------+----------------------+-----------------+----+---+----+
```
COW is right, while MOR is wrong, because the combine key for the last
commit is lower than existing record and should not be shown.
```python
tableName = "test_hudi_pyspark_local"
basePath = f"/tmp/{tableName}"
df = spark.sql("select 'a' as uuid, 10 as ts, 'foo' as part")
#mode="COPY_ON_WRITE"
mode="MERGE_ON_READ"
hudi_options = {
"hoodie.table.name": tableName,
"hoodie.datasource.write.table.type": mode,
"hoodie.datasource.write.payload.class":
"org.apache.hudi.common.model.DefaultHoodieRecordPayload",
"hoodie.datasource.write.recordkey.field": "uuid",
"hoodie.datasource.write.partitionpath.field": "part",
"hoodie.datasource.write.table.name": tableName,
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.precombine.field": "ts",
"hoodie.upsert.shuffle.parallelism": 2,
"hoodie.insert.shuffle.parallelism": 2,
"hoodie.datasource.hive_sync.enable": "false",
"hoodie.datasource.hive_sync.partition_fields": "part",
"hoodie.datasource.hive_sync.partition_extractor_class":
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
}
(df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath))
spark.read.format("hudi").load(basePath).show()
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name|uuid| ts|part|
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
| 20230317202715405|20230317202715405...| a|
foo|20c4c9f3-2cbc-439...| a| 10| foo|
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
df = spark.sql("select 'a' as uuid, 11 as ts, 'foo' as part")
(df.write.format("hudi").options(**hudi_options).mode("append").save(basePath))
spark.read.format("hudi").load(basePath).show()
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name|uuid| ts|part|
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
| 20230317202717144|20230317202717144...| a|
foo|20c4c9f3-2cbc-439...| a| 11| foo|
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
df = spark.sql("select 'a' as uuid, 9 as ts, 'foo' as part")
(df.write.format("hudi").options(**hudi_options).mode("append").save(basePath))
spark.read.format("hudi").load(basePath).show()
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name|uuid| ts|part|
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
| 20230317202717144|20230317202717144...| a|
foo|20c4c9f3-2cbc-439...| a| 11| foo|
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
# look into the timeline to get the latest commit
# then read incremental
# $ ll /tmp/test_hudi_pyspark_local/.hoodie/
# .schema
# archived
# .aux
# 20230317202715405.deltacommit.requested
# .20230317202715405.deltacommit.requested.crc
# metadata
# hoodie.properties
# .hoodie.properties.crc
# 20230317202715405.deltacommit.inflight
# .20230317202715405.deltacommit.inflight.crc
# 20230317202715405.deltacommit
# .20230317202715405.deltacommit.crc
# 20230317202717144.deltacommit.requested
# .20230317202717144.deltacommit.requested.crc
# 20230317202717144.deltacommit.inflight
# .20230317202717144.deltacommit.inflight.crc
# .20230317202717144.deltacommit.crc
# 20230317202717144.deltacommit
# 20230317202718414.deltacommit.requested
# .20230317202718414.deltacommit.requested.crc
# .20230317202718414.deltacommit.inflight.crc
# 20230317202718414.deltacommit.inflight
# .20230317202718414.deltacommit.crc
# 20230317202718414.deltacommit
# .temp
last_commit=20230317202718414
hudi_incremental = {
"hoodie.table.name": tableName,
"hoodie.datasource.write.table.type": mode,
"hoodie.datasource.write.payload.class":
"org.apache.hudi.common.model.DefaultHoodieRecordPayload",
"hoodie.datasource.write.recordkey.field": "uuid",
"hoodie.datasource.write.partitionpath.field": "part",
"hoodie.datasource.write.table.name": tableName,
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.precombine.field": "ts",
"hoodie.upsert.shuffle.parallelism": 2,
"hoodie.insert.shuffle.parallelism": 2,
"hoodie.datasource.hive_sync.enable": "false",
"hoodie.datasource.query.type": "incremental",
"hoodie.datasource.read.begin.instanttime": str(last_commit -1),
"hoodie.datasource.read.end.instanttime": str(last_commit),
}
spark.read.format("hudi").options(**hudi_incremental).load(basePath).show()
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name|uuid| ts|part|
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
| 20230317202718414|20230317202718414...| a|
foo|20c4c9f3-2cbc-439...| a| 9| foo|
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
```
```
sc.setLogLevel("WARN")
tableName = "test_hudi_pyspark_local"
basePath = f"/tmp/{tableName}"
df = spark.sql("select 'a' as uuid, 10 as ts, 'foo' as part")
mode="COPY_ON_WRITE"
#mode="MERGE_ON_READ"
hudi_options = {
"hoodie.table.name": tableName,
"hoodie.datasource.write.table.type": mode,
"hoodie.datasource.write.payload.class":
"org.apache.hudi.common.model.DefaultHoodieRecordPayload",
"hoodie.datasource.write.recordkey.field": "uuid",
"hoodie.datasource.write.partitionpath.field": "part",
"hoodie.datasource.write.table.name": tableName,
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.precombine.field": "ts",
"hoodie.upsert.shuffle.parallelism": 2,
"hoodie.insert.shuffle.parallelism": 2,
"hoodie.datasource.hive_sync.enable": "false",
"hoodie.datasource.hive_sync.partition_fields": "part",
"hoodie.datasource.hive_sync.partition_extractor_class":
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
}
(df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath))
spark.read.format("hudi").load(basePath).show()
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name|uuid| ts|part|
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
| 20230317203659919|20230317203659919...| a|
foo|e90f346f-d878-439...| a| 10| foo|
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
df = spark.sql("select 'a' as uuid, 11 as ts, 'foo' as part")
(df.write.format("hudi").options(**hudi_options).mode("append").save(basePath))
spark.read.format("hudi").load(basePath).show()
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name|uuid| ts|part|
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
| 20230317203701577|20230317203701577...| a|
foo|e90f346f-d878-439...| a| 11| foo|
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
df = spark.sql("select 'a' as uuid, 9 as ts, 'foo' as part")
(df.write.format("hudi").options(**hudi_options).mode("append").save(basePath))
spark.read.format("hudi").load(basePath).show()
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name|uuid| ts|part|
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
| 20230317203701577|20230317203701577...| a|
foo|e90f346f-d878-439...| a| 11| foo|
+-------------------+--------------------+------------------+----------------------+--------------------+----+---+----+
# look into the timeline to get the latest commit
# then read incremental
# $ ll /tmp/test_hudi_pyspark_local/.hoodie/
# .schema
# .aux
# archived
# 20230317203659919.commit.requested
# .20230317203659919.commit.requested.crc
# metadata
# hoodie.properties
# .hoodie.properties.crc
# .20230317203659919.inflight.crc
# 20230317203659919.inflight
# .20230317203659919.commit.crc
# 20230317203659919.commit
# 20230317203701577.commit.requested
# .20230317203701577.commit.requested.crc
# .20230317203701577.inflight.crc
# 20230317203701577.inflight
# .20230317203701577.commit.crc
# 20230317203701577.commit
# 20230317203702776.commit.requested
# .20230317203702776.commit.requested.crc
# .20230317203702776.inflight.crc
# 20230317203702776.inflight
# .20230317203702776.commit.crc
# 20230317203702776.commit
last_commit=20230317203702776
hudi_incremental = {
"hoodie.table.name": tableName,
"hoodie.datasource.write.table.type": mode,
"hoodie.datasource.write.payload.class":
"org.apache.hudi.common.model.DefaultHoodieRecordPayload",
"hoodie.datasource.write.recordkey.field": "uuid",
"hoodie.datasource.write.partitionpath.field": "part",
"hoodie.datasource.write.table.name": tableName,
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.precombine.field": "ts",
"hoodie.upsert.shuffle.parallelism": 2,
"hoodie.insert.shuffle.parallelism": 2,
"hoodie.datasource.hive_sync.enable": "false",
"hoodie.datasource.query.type": "incremental",
"hoodie.datasource.read.begin.instanttime": str(last_commit -1),
"hoodie.datasource.read.end.instanttime": str(last_commit),
}
spark.read.format("hudi").options(**hudi_incremental).load(basePath).show()
+-------------------+--------------------+------------------+----------------------+-----------------+----+---+----+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name|uuid|
ts|part|
+-------------------+--------------------+------------------+----------------------+-----------------+----+---+----+
+-------------------+--------------------+------------------+----------------------+-----------------+----+---+----+
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]