[
https://issues.apache.org/jira/browse/HUDI-9540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18016616#comment-18016616
]
Hans Eschbaum edited comment on HUDI-9540 at 8/27/25 6:52 PM:
--------------------------------------------------------------
I managed to reproduce this purely in Spark as well, using
[https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.5-bundle_2.12/1.0.2/hudi-spark3.5-bundle_2.12-1.0.2.jar]
{code:java}
from pyspark.sql import Row
path = "s3://dev/test_mor"
def upsert(rows, mode):
hudi_options = {
'hoodie.table.name': 'test123',
'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.datasource.hive_sync.enable': 'false',
'hoodie.compact.inline': 'true',
'hoodie.compact.inline.trigger.strategy': 'NUM_COMMITS',
'hoodie.compact.inline.max.delta.commits': '3',
'hoodie.cleaner.policy':'KEEP_LATEST_COMMITS',
'hoodie.clean.max.commits': '2',
'hoodie.clean.async':'false',
'hoodie.clean.automatic':'true',
'hoodie.cleaner.commits.retained':'2',
'hoodie.keep.max.commits':'4',
'hoodie.keep.min.commits':'2',
'hoodie.metadata.enable': 'false'
}
df = spark.createDataFrame([Row(**r) for r in rows])
(df.write
.format("hudi")
.option('hoodie.datasource.write.operation', 'upsert')
.options(**hudi_options)
.mode(mode)
.save(path)
)
for i in range(1, 9):
upsert([{"id": "id-1", "ts": i, "rider": f"r1{i}"}], "overwrite" if i == 1
else "append")
read_options = {
'hoodie.datasource.query.type': 'incremental',
'hoodie.datasource.read.begin.instanttime': '000'
}
df = spark.read \
.format('org.apache.hudi') \
.options(**read_options) \
.load(path)
print("incremental:")
df.show(truncate=False)
df = spark.read \
.format('org.apache.hudi') \
.load(path)
print("full:")
df.show(truncate=False)
{code}
{code:java}
incremental:
+-------------------+---------------------+------------------+----------------------+---------------------------------------------------------------------------+----+---+-----+
|_hoodie_commit_time|_hoodie_commit_seqno
|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name
|id |ts |rider|
+-------------------+---------------------+------------------+----------------------+---------------------------------------------------------------------------+----+---+-----+
|20250827183021346 |20250827183021346_0_3|id-1 |
|0bddf683-34a7-407b-8807-33993405d6ca-0_0-342-1387_20250827183025449.parquet|id-1|6
|r16 |
+-------------------+---------------------+------------------+----------------------+---------------------------------------------------------------------------+----+---+-----+
full:
+-------------------+---------------------+------------------+----------------------+--------------------------------------+----+---+-----+
|_hoodie_commit_time|_hoodie_commit_seqno
|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name
|id |ts |rider|
+-------------------+---------------------+------------------+----------------------+--------------------------------------+----+---+-----+
|20250827183036110 |20250827183036110_0_7|id-1 |
|0bddf683-34a7-407b-8807-33993405d6ca-0|id-1|8 |r18 |
+-------------------+---------------------+------------------+----------------------+--------------------------------------+----+---+-----+
{code}
Look at *ts* and *rider* columns.
Timeline last 4 commits ascending:
compaction: 20250827183025449
deltacommit: 20250827183029214
deltacommit: 20250827183036110
clean: 20250827183040021
So again it loses track of the commits that happen after the compaction if a
clean has also taken place. When I add
'hoodie.datasource.read.incr.fallback.fulltablescan.enable': 'false' then the
incremental read is successful.
I also tested this script with
0.15.0 - works fine, no issue.
1.0.0 - works fine, no issue.
1.0.1 I got this error and couldn't test:
org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table
test123. dbName cannot be null or empty
pyspark.errors.exceptions.captured.AnalysisException:
org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table
test123. dbName cannot be null or empty
was (Author: JIRAUSER310062):
I managed to reproduce this purely in Spark as well, using
https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.5-bundle_2.12/1.0.2/hudi-spark3.5-bundle_2.12-1.0.2.jar
from pyspark.sql import Row
path = "s3://dev/test_mor"
def upsert(rows, mode):
hudi_options = {
'hoodie.table.name': 'test123',
'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.datasource.hive_sync.enable': 'false',
'hoodie.compact.inline': 'true',
'hoodie.compact.inline.trigger.strategy': 'NUM_COMMITS',
'hoodie.compact.inline.max.delta.commits': '3',
'hoodie.cleaner.policy':'KEEP_LATEST_COMMITS',
'hoodie.clean.max.commits': '2',
'hoodie.clean.async':'false',
'hoodie.clean.automatic':'true',
'hoodie.cleaner.commits.retained':'2',
'hoodie.keep.max.commits':'4',
'hoodie.keep.min.commits':'2',
'hoodie.metadata.enable': 'false'
}
df = spark.createDataFrame([Row(**r) for r in rows])
(df.write
.format("hudi")
.option('hoodie.datasource.write.operation', 'upsert')
.options(**hudi_options)
.mode(mode)
.save(path)
)
for i in range(1, 9):
upsert([\{"id": "id-1", "ts": i, "rider": f"r1{i}"}], "overwrite" if i == 1
else "append")
read_options = {
'hoodie.datasource.query.type': 'incremental',
'hoodie.datasource.read.begin.instanttime': '000'
}
df = spark.read \
.format('org.apache.hudi') \
.options(**read_options) \
.load(path)
print("incremental:")
df.show(truncate=False)
df = spark.read \
.format('org.apache.hudi') \
.load(path)
print("full:")
df.show(truncate=False)
{code:java}
incremental:
+-------------------+---------------------+------------------+----------------------+---------------------------------------------------------------------------+----+---+-----+
|_hoodie_commit_time|_hoodie_commit_seqno
|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name
|id |ts |rider|
+-------------------+---------------------+------------------+----------------------+---------------------------------------------------------------------------+----+---+-----+
|20250827183021346 |20250827183021346_0_3|id-1 |
|0bddf683-34a7-407b-8807-33993405d6ca-0_0-342-1387_20250827183025449.parquet|id-1|6
|r16 |
+-------------------+---------------------+------------------+----------------------+---------------------------------------------------------------------------+----+---+-----+
full:
+-------------------+---------------------+------------------+----------------------+--------------------------------------+----+---+-----+
|_hoodie_commit_time|_hoodie_commit_seqno
|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name
|id |ts |rider|
+-------------------+---------------------+------------------+----------------------+--------------------------------------+----+---+-----+
|20250827183036110 |20250827183036110_0_7|id-1 |
|0bddf683-34a7-407b-8807-33993405d6ca-0|id-1|8 |r18 |
+-------------------+---------------------+------------------+----------------------+--------------------------------------+----+---+-----+
{code}
Look at *ts* and *rider* columns.
Timeline last 4 commits ascending:
compaction: 20250827183025449
deltacommit: 20250827183029214
deltacommit: 20250827183036110
clean: 20250827183040021
So again it loses track of the commits that happen after the compaction if a
clean has also taken place. When I add
'hoodie.datasource.read.incr.fallback.fulltablescan.enable': 'false' then the
incremental read is successful.
I also tested this script with
0.15.0 - works fine, no issue.
1.0.0 - works fine, no issue.
1.0.1 I got this error and couldn't test:
org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table
test123. dbName cannot be null or empty
pyspark.errors.exceptions.captured.AnalysisException:
org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table
test123. dbName cannot be null or empty
> Incremental Reads Issue With Flink
> ----------------------------------
>
> Key: HUDI-9540
> URL: https://issues.apache.org/jira/browse/HUDI-9540
> Project: Apache Hudi
> Issue Type: Bug
> Affects Versions: 1.0.2
> Reporter: Hans Eschbaum
> Priority: Major
> Attachments: image-2025-08-27-15-25-33-375.png
>
>
> Hey!
> There seems to be an issue with incremental reads with MoR tables in Flink.
>
> When I read via PySpark with, ie.
> {code:java}
> read_options = {
> 'hoodie.datasource.query.type': 'incremental',
> 'hoodie.datasource.read.begin.instanttime': '0',
> }{code}
> Some of the records do not show up that do show up with snapshot reads.
> I think this issue is likely related to the differences between MoR writes
> between Spark and Flink, where Spark always puts inserts into the base file,
> but Flink puts the inserts into the log files as well. So for some reason
> some or all of these inserts that only exist in the log files and haven't
> been compacted into base files are ignored during incremental reads.
> (My data has no deletes)
> Hudi version: 1.0.2
> Flink Version: 1.20.0
> Storage: S3
> (Writes are via AWS Managed Flink, reads are via EMR Serverless)
> To reproduce:
> 1. Do some writes via Flink to a Hudi sink
> 2. Compare an incremental read to a snapshot read, some of the records go
> missing.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)