[ 
https://issues.apache.org/jira/browse/HUDI-9540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18016616#comment-18016616
 ] 

Hans Eschbaum edited comment on HUDI-9540 at 8/27/25 6:52 PM:
--------------------------------------------------------------

I managed to reproduce this purely in Spark as well, using 
[https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.5-bundle_2.12/1.0.2/hudi-spark3.5-bundle_2.12-1.0.2.jar]

 
{code:java}
from pyspark.sql import Row
path = "s3://dev/test_mor"
def upsert(rows, mode):
    hudi_options = {
        'hoodie.table.name': 'test123',
        'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
        'hoodie.datasource.write.recordkey.field': 'id',
        'hoodie.datasource.write.precombine.field': 'ts',
        'hoodie.datasource.hive_sync.enable': 'false',
        'hoodie.compact.inline': 'true',
        'hoodie.compact.inline.trigger.strategy': 'NUM_COMMITS',
        'hoodie.compact.inline.max.delta.commits': '3',
        'hoodie.cleaner.policy':'KEEP_LATEST_COMMITS',
        'hoodie.clean.max.commits': '2',
        'hoodie.clean.async':'false',
        'hoodie.clean.automatic':'true',
        'hoodie.cleaner.commits.retained':'2',
        'hoodie.keep.max.commits':'4',
        'hoodie.keep.min.commits':'2',
        'hoodie.metadata.enable': 'false'
    }
    
    df = spark.createDataFrame([Row(**r) for r in rows])
    (df.write
       .format("hudi")
       .option('hoodie.datasource.write.operation', 'upsert') 
       .options(**hudi_options) 
       .mode(mode)
       .save(path)
    )
for i in range(1, 9):
    upsert([{"id": "id-1", "ts": i, "rider": f"r1{i}"}], "overwrite" if i == 1 
else "append")
read_options = {
    'hoodie.datasource.query.type': 'incremental',
    'hoodie.datasource.read.begin.instanttime': '000'
}
df = spark.read \
    .format('org.apache.hudi') \
    .options(**read_options) \
    .load(path)
print("incremental:")
df.show(truncate=False)
df = spark.read \
    .format('org.apache.hudi') \
    .load(path)
print("full:")
df.show(truncate=False)
{code}
 

 
{code:java}
incremental:
+-------------------+---------------------+------------------+----------------------+---------------------------------------------------------------------------+----+---+-----+
|_hoodie_commit_time|_hoodie_commit_seqno 
|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name                    
                                      |id  |ts |rider|
+-------------------+---------------------+------------------+----------------------+---------------------------------------------------------------------------+----+---+-----+
|20250827183021346  |20250827183021346_0_3|id-1              |                  
    
|0bddf683-34a7-407b-8807-33993405d6ca-0_0-342-1387_20250827183025449.parquet|id-1|6
  |r16  |
+-------------------+---------------------+------------------+----------------------+---------------------------------------------------------------------------+----+---+-----+
full:
+-------------------+---------------------+------------------+----------------------+--------------------------------------+----+---+-----+
|_hoodie_commit_time|_hoodie_commit_seqno 
|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name                    
 |id  |ts |rider|
+-------------------+---------------------+------------------+----------------------+--------------------------------------+----+---+-----+
|20250827183036110  |20250827183036110_0_7|id-1              |                  
    |0bddf683-34a7-407b-8807-33993405d6ca-0|id-1|8  |r18  |
+-------------------+---------------------+------------------+----------------------+--------------------------------------+----+---+-----+
{code}
 

Look at *ts* and *rider* columns.

Timeline last 4 commits ascending:
compaction: 20250827183025449
deltacommit: 20250827183029214
deltacommit: 20250827183036110
clean: 20250827183040021

So again it loses track of the commits that happen after the compaction if a 
clean has also taken place. When I add 
'hoodie.datasource.read.incr.fallback.fulltablescan.enable': 'false' then the 
incremental read is successful.

I also tested this script with 
0.15.0 - works fine, no issue.
1.0.0 - works fine, no issue.
1.0.1 I got this error and couldn't test:
 org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table 
test123. dbName cannot be null or empty
 pyspark.errors.exceptions.captured.AnalysisException: 
org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table 
test123. dbName cannot be null or empty


was (Author: JIRAUSER310062):
I managed to reproduce this purely in Spark as well, using 
https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.5-bundle_2.12/1.0.2/hudi-spark3.5-bundle_2.12-1.0.2.jar

from pyspark.sql import Row
path = "s3://dev/test_mor"

def upsert(rows, mode):
    hudi_options = {
        'hoodie.table.name': 'test123',
        'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
        'hoodie.datasource.write.recordkey.field': 'id',
        'hoodie.datasource.write.precombine.field': 'ts',
        'hoodie.datasource.hive_sync.enable': 'false',
        'hoodie.compact.inline': 'true',
        'hoodie.compact.inline.trigger.strategy': 'NUM_COMMITS',
        'hoodie.compact.inline.max.delta.commits': '3',
        'hoodie.cleaner.policy':'KEEP_LATEST_COMMITS',
        'hoodie.clean.max.commits': '2',
        'hoodie.clean.async':'false',
        'hoodie.clean.automatic':'true',
        'hoodie.cleaner.commits.retained':'2',
        'hoodie.keep.max.commits':'4',
        'hoodie.keep.min.commits':'2',
        'hoodie.metadata.enable': 'false'
    }
    
    df = spark.createDataFrame([Row(**r) for r in rows])
    (df.write
       .format("hudi")
       .option('hoodie.datasource.write.operation', 'upsert') 
       .options(**hudi_options) 
       .mode(mode)
       .save(path)
    )

for i in range(1, 9):
    upsert([\{"id": "id-1", "ts": i, "rider": f"r1{i}"}], "overwrite" if i == 1 
else "append")

read_options = {
    'hoodie.datasource.query.type': 'incremental',
    'hoodie.datasource.read.begin.instanttime': '000'
}
df = spark.read \
    .format('org.apache.hudi') \
    .options(**read_options) \
    .load(path)

print("incremental:")
df.show(truncate=False)

df = spark.read \
    .format('org.apache.hudi') \
    .load(path)

print("full:")
df.show(truncate=False)

 
{code:java}
incremental:
+-------------------+---------------------+------------------+----------------------+---------------------------------------------------------------------------+----+---+-----+
|_hoodie_commit_time|_hoodie_commit_seqno 
|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name                    
                                      |id  |ts |rider|
+-------------------+---------------------+------------------+----------------------+---------------------------------------------------------------------------+----+---+-----+
|20250827183021346  |20250827183021346_0_3|id-1              |                  
    
|0bddf683-34a7-407b-8807-33993405d6ca-0_0-342-1387_20250827183025449.parquet|id-1|6
  |r16  |
+-------------------+---------------------+------------------+----------------------+---------------------------------------------------------------------------+----+---+-----+
full:
+-------------------+---------------------+------------------+----------------------+--------------------------------------+----+---+-----+
|_hoodie_commit_time|_hoodie_commit_seqno 
|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name                    
 |id  |ts |rider|
+-------------------+---------------------+------------------+----------------------+--------------------------------------+----+---+-----+
|20250827183036110  |20250827183036110_0_7|id-1              |                  
    |0bddf683-34a7-407b-8807-33993405d6ca-0|id-1|8  |r18  |
+-------------------+---------------------+------------------+----------------------+--------------------------------------+----+---+-----+
{code}
 

Look at *ts* and *rider* columns.


Timeline last 4 commits ascending:
compaction: 20250827183025449
deltacommit: 20250827183029214
deltacommit: 20250827183036110
clean: 20250827183040021

So again it loses track of the commits that happen after the compaction if a 
clean has also taken place. When I add 
'hoodie.datasource.read.incr.fallback.fulltablescan.enable': 'false' then the 
incremental read is successful.

I also tested this script with 
0.15.0 - works fine, no issue.
1.0.0 - works fine, no issue.
1.0.1 I got this error and couldn't test:
 org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table 
test123. dbName cannot be null or empty
 pyspark.errors.exceptions.captured.AnalysisException: 
org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table 
test123. dbName cannot be null or empty

> Incremental Reads Issue With Flink
> ----------------------------------
>
>                 Key: HUDI-9540
>                 URL: https://issues.apache.org/jira/browse/HUDI-9540
>             Project: Apache Hudi
>          Issue Type: Bug
>    Affects Versions: 1.0.2
>            Reporter: Hans Eschbaum
>            Priority: Major
>         Attachments: image-2025-08-27-15-25-33-375.png
>
>
> Hey!
> There seems to be an issue with incremental reads with MoR tables in Flink.
>  
> When I read via PySpark with, ie.
> {code:java}
> read_options = {
>     'hoodie.datasource.query.type': 'incremental',
>     'hoodie.datasource.read.begin.instanttime': '0',
> }{code}
> Some of the records do not show up that do show up with snapshot reads.
> I think this issue is likely related to the differences between MoR writes 
> between Spark and Flink, where Spark always puts inserts into the base file, 
> but Flink puts the inserts into the log files as well. So for some reason 
> some or all of these inserts that only exist in the log files and haven't 
> been compacted into base files are ignored during incremental reads.
> (My data has no deletes)
> Hudi version: 1.0.2
> Flink Version: 1.20.0
> Storage: S3
> (Writes are via AWS Managed Flink, reads are via EMR Serverless)
> To reproduce:
> 1. Do some writes via Flink to a Hudi sink
> 2. Compare an incremental read to a snapshot read, some of the records go 
> missing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to