[ 
https://issues.apache.org/jira/browse/HUDI-4726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

HunterXHunter updated HUDI-4726:
--------------------------------
    Description: 
When using Flink for incremental query, when `read.start-commit is out of 
range`, full table scanning should not be performed.
{code:java}
-- create
CREATE TABLE hudi_4726(
id string,
msg string,
`partition` STRING,
PRIMARY KEY(id) NOT ENFORCED
)PARTITIONED BY (`partition`)
 WITH (
        'connector' = 'hudi',
        'write.operation'='upsert',
        'path' = 'hudi_4726',
        'index.type' = 'BUCKET',
        'hoodie.bucket.index.num.buckets' = '2', 
       'compaction.delta_commits' = '2', 
       'table.type' = 'MERGE_ON_READ', 
       'compaction.async.enabled'='true')
-- insert 
INSERT INTO hudi_4726 values ('id1','t1','par1')
INSERT INTO hudi_4726 values ('id1','t2','par1')
INSERT INTO hudi_4726 values ('id1','t3','par1')
INSERT INTO hudi_4726 values ('id1','t4','par1')
-- .hoodie
t1.deltacommit  (t1)
t2.deltacommit  (t2)
t3.commit       (t2)
t4.deltacommit  (t3)
t5.deltacommit  (t4)
t6.commit       (t4)

t3.parquet
t6.parquet
-- read

exp1 : 'read.start-commit'='t1', 'read.end-commit'='t1'  -- (true,+I[id1, t1, 
par1])
exp2 : 'read.start-commit'='t1', 'read.end-commit'='t2' -- (true,+I[id1, t2, 
par1])
exp3 : 'read.start-commit'='t1', 'read.end-commit'='t3' -- (true,+I[id1, t2, 
par1])
-- but 
'read.start-commit'='0', 'read.end-commit'='t3' -- (nothing) -- expect should 
be like exp3.
'read.start-commit'='0', 'read.end-commit'='t4' -- (nothing) -- expect should 
be (true,+I[id1, t3, par1]). 
'read.start-commit'='0', 'read.end-commit'='t5' -- (true,+I[id1, t4, par1]) 
this is right{code}
The root of the problem is `IncrementalInputSplits.inputSplits`, because 
`startCommit` is out of range, `fullTableScan` is `true`, finally, the file 
read is t6..parquet instead of t3.parquet.

 

  was:
 
{code:java}
-- create
CREATE TABLE hudi_4726(
id string,
msg string,
`partition` STRING,
PRIMARY KEY(id) NOT ENFORCED
)PARTITIONED BY (`partition`)
 WITH (
        'connector' = 'hudi',
        'write.operation'='upsert',
        'path' = 'hudi_4726',
        'index.type' = 'BUCKET',
        'hoodie.bucket.index.num.buckets' = '2', 
       'compaction.delta_commits' = '2', 
       'table.type' = 'MERGE_ON_READ', 
       'compaction.async.enabled'='true')
-- insert 
INSERT INTO hudi_4726 values ('id1','t1','par1')
INSERT INTO hudi_4726 values ('id1','t2','par1')
INSERT INTO hudi_4726 values ('id1','t3','par1')
INSERT INTO hudi_4726 values ('id1','t4','par1')
-- .hoodie
t1.deltacommit  (t1)
t2.deltacommit  (t2)
t3.commit       (t2)
t4.deltacommit  (t3)
t5.deltacommit  (t4)
t6.commit       (t4)

t3.parquet
t6.parquet
-- read

exp1 : 'read.start-commit'='t1', 'read.end-commit'='t1'  -- (true,+I[id1, t1, 
par1])
exp2 : 'read.start-commit'='t1', 'read.end-commit'='t2' -- (true,+I[id1, t2, 
par1])
exp3 : 'read.start-commit'='t1', 'read.end-commit'='t3' -- (true,+I[id1, t2, 
par1])
-- but 
'read.start-commit'='0', 'read.end-commit'='t3' -- (nothing) -- expect should 
be like exp3.
'read.start-commit'='0', 'read.end-commit'='t4' -- (nothing) -- expect should 
be (true,+I[id1, t3, par1]). 
'read.start-commit'='0', 'read.end-commit'='t5' -- (true,+I[id1, t4, par1]) 
this is right{code}
The root of the problem is `IncrementalInputSplits.inputSplits`, because 
`startCommit` is out of range, `fullTableScan` is `true`, finally, the file 
read is t6..parquet instead of t3.parquet.

 


> When using Flink for incremental query, when `read.start-commit is out of 
> range`, full table scanning should not be performed.
> ------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: HUDI-4726
>                 URL: https://issues.apache.org/jira/browse/HUDI-4726
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: flink
>            Reporter: HunterXHunter
>            Assignee: HunterXHunter
>            Priority: Major
>
> When using Flink for incremental query, when `read.start-commit is out of 
> range`, full table scanning should not be performed.
> {code:java}
> -- create
> CREATE TABLE hudi_4726(
> id string,
> msg string,
> `partition` STRING,
> PRIMARY KEY(id) NOT ENFORCED
> )PARTITIONED BY (`partition`)
>  WITH (
>         'connector' = 'hudi',
>         'write.operation'='upsert',
>         'path' = 'hudi_4726',
>         'index.type' = 'BUCKET',
>         'hoodie.bucket.index.num.buckets' = '2', 
>        'compaction.delta_commits' = '2', 
>        'table.type' = 'MERGE_ON_READ', 
>        'compaction.async.enabled'='true')
> -- insert 
> INSERT INTO hudi_4726 values ('id1','t1','par1')
> INSERT INTO hudi_4726 values ('id1','t2','par1')
> INSERT INTO hudi_4726 values ('id1','t3','par1')
> INSERT INTO hudi_4726 values ('id1','t4','par1')
> -- .hoodie
> t1.deltacommit  (t1)
> t2.deltacommit  (t2)
> t3.commit       (t2)
> t4.deltacommit  (t3)
> t5.deltacommit  (t4)
> t6.commit       (t4)
> t3.parquet
> t6.parquet
> -- read
> exp1 : 'read.start-commit'='t1', 'read.end-commit'='t1'  -- (true,+I[id1, t1, 
> par1])
> exp2 : 'read.start-commit'='t1', 'read.end-commit'='t2' -- (true,+I[id1, t2, 
> par1])
> exp3 : 'read.start-commit'='t1', 'read.end-commit'='t3' -- (true,+I[id1, t2, 
> par1])
> -- but 
> 'read.start-commit'='0', 'read.end-commit'='t3' -- (nothing) -- expect should 
> be like exp3.
> 'read.start-commit'='0', 'read.end-commit'='t4' -- (nothing) -- expect should 
> be (true,+I[id1, t3, par1]). 
> 'read.start-commit'='0', 'read.end-commit'='t5' -- (true,+I[id1, t4, par1]) 
> this is right{code}
> The root of the problem is `IncrementalInputSplits.inputSplits`, because 
> `startCommit` is out of range, `fullTableScan` is `true`, finally, the file 
> read is t6..parquet instead of t3.parquet.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to