[ 
https://issues.apache.org/jira/browse/HUDI-7975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17882543#comment-17882543
 ] 

Krishen Bhan edited comment on HUDI-7975 at 9/19/24 9:39 PM:
-------------------------------------------------------------

Thanks for sharing this example, I had a concern about there being sudden 
"large" cleans for a dataset with insert + bulk-insert workload. 
As an example, assume that num_commits  for clean is 25 and num_commits for 
archival is 75, and due to delayed clean+archival the active timeline is in the 
following state.
(By the way, I am assuming that in the above proposal HUDI will do clean 
planning as usual if it sees at least one replacecommit/cluster, since that 
would be inexpensive to add. )

 
{code:java}
ECTR = C1
[c1.commit, c2.commit . . . c10.commit, . . . c99.commit]
where c1 to c10 are inserts, and c11 - c99 are bulk inserts{code}
if we apply this proposal to 0.x,  my understanding is that the following can 
happen:
 # Clean sees that all the last 50-55 (2 * num_commits_clean + 5) are all bulk 
inserts, and doesn't schedule clean
 # Archival then runs, and archives c1 to c24. Since there is no 
replace/inflight/savepoint blocking it

 

Now timeline is in following state
{code:java}
ECTR = C1 
[c25 . . c99.commit] 
where c25 - c99 are bulk inserts{code}
the dataset is not inconsistent, but has partitions with old file versions

 

Now lets say time passes and 50 more commit instants get added to timeline, but 
one of the new instants is an insert instead of a bulk insert
{code:java}
ECTR = C1 
[c25 . . c99.commit, c100.commit . . . c124.commit, . . . c149.commit] 
Where c124 is an insert but all other new instants are bulk inserts{code}
The next time clean runs, it will find c124 was an insert has left 
to-be-cleaned files in dataset (since c124 was in the lookback window of 50-55 
instants) and will correctly schedule clean, targeting all instants before 
c125. Because the current ECTR C1 is not in active timeline though, it will do 
a "full scan" clean and read all partitions in the dataset and determine which 
are subject to clean. The final list of partitions to process includes not only 
partitions affected by C124, but also partitions affected by c1 to c10.

The concern I had is that this occasional "larger" clean may take up a lot of 
time from writer, causing spikes in (resource x time) that may be unpredictable 
from perspective of the user. (From personal experience with 0.10 I've seen it 
cause OOM issues, but for now for simplicity's sake I'll assume many of those 
are already fixed in later hudi versions so I won't bring it up as a concern 
here). And the longer this clean scheduling is deferred, the more 
partitions/files have to be processed+cleaned potentially by this next full 
scan clean. The reason why I am focusing on full scan clean is that afaik this 
full scan clean cannot be "split" across multiple clean instants (that each 
have a "bound" of files/partitions to process). This makes orchestration a bit 
tricker in my opinion, as now you may have to have your ingestion writer block 
out time/resources to do these occasional large cleans, and make sure the 
timeouts/resources you set for the clean writer are enough to account for this 
case

My understanding based on our offline discussion is that there are two issues 
we would like to resolve with this proposal 
 - The issue I brought up of having an occasional full scan expensive clean on 
a insert-only dataset whenever theres a surge of non-insert instants
 - The optimization you highlighted where HUDI clean planning should reduce 
latency for cleans on insert-only low-latency workloads by not having to read 
every single .commit metadata file (1 I/O + deserialization call per file in 
active write timeline)

Just thinking out loud, I was thinking from here would could explore two avenues
 - Continue with your proposal here, but also look into optimizations for "full 
scan" clean, such as allowing a user to schedule & execute multiple "partial" 
cleans that each have a bound. This way a large clean backlog can be gradually 
worked on over the course of multiple jobs (so if a low-latency ingestion 
writer doesn't have many resources, after each write commit it can do a smaller 
clean that will make some progress).

 - Store a subset (or subsequence rather) of all instants in the active 
timeline that have updated/replaced a file group (replacecommit, update, small 
file handling, etc) to some file. This will be updated anytime there is a write 
or clean, so it does not need to be computed from reading all instants on the 
timeline. The idea is that using this archival can efficiently block on the 
earliest instant that hasn't been cleaned (lessening the archival -> clean 
dependency that you were concerned about during our offline discussion). And 
clean planner can efficiently see wether or not it should schedule a clean. 
Since now both table services will no longer need to read metadata file of 
every insert-only commit file in the timeline, it should be feasible for a 
low-latency ingestion writer to do.

 


was (Author: JIRAUSER301521):
Thanks for sharing this example, I had a concern about the following scenario. 
Assume that num_commits  for clean is 25 and num_commits for archival is 75, 
and due to delayed clean+archival the active timeline is in the following state.
(By the way, I am assuming that in the above proposal HUDI will do clean 
planning as usual if it sees at least one replacecommit/cluster, since that 
would be inexpensive to add)

 
{code:java}
ECTR = C1
[c1.commit, c2.commit . . . c10.commit, . . . c99.commit]
where c1 to c10 have updates or small file writes, and c11 - c99 are solely 
inserts{code}
if we apply this proposal to 0.x,  my understanding is that the following can 
happen:

 
 # Clean sees that all the last 50-55 (2 * num_commits_clean + 5) are all 
inserts, and doesn't schedule clean
 # Archival then runs, and archives c1 to c24. Since there is no 
replace/inflight/savepoint blocking it

 

Now timeline is in following state
{code:java}
ECTR = C1 
[c25 . . c99.commit] 
where c25 - c99 are solely inserts  {code}
the dataset is not inconsistent, but has partitions with old file versions

 

Now lets say time passes and 50 more commit instants get added to timeline, but 
one of the new instants is not solely an insert
{code:java}
ECTR = C1 
[c25 . . c99.commit, c100.commit . . . c124.commit, . . . c149.commit] 
Where c124 updated a file group but all other instants are solely inserts.{code}
The next time clean runs, it will find c124 was not just an insert (since it 
was in the lookback window of 50-55 instants) and will correctly schedule 
clean, targeting all instants before c125. Because the current ECTR C1 is not 
in active timeline though, it will do a "full scan" clean and read all 
partitions in the dataset and determine which are subject to clean. The final 
list of partitions to process includes not only partitions affected by C124, 
but also partitions affected by c1 to c10.

The concern I had is that this occasional "larger" clean may take up a lot of 
time from writer, causing spikes in (resource x time) that may be unpredictable 
from perspective of the user. (From personal experience with 0.10 I've seen it 
cause OOM issues, but for now for simplicity's sake I'll assume many of those 
are already fixed in later hudi versions). And the longer this clean scheduling 
is deferred, the more partitions/files have to be processed+cleaned potentially 
by this next full scan clean. The reason why I am focusing on full scan clean 
is that afaik this full scan clean cannot be "split" across multiple clean 
instants (that each have a "bound" of files/partitions to process). This makes 
orchestration a bit tricker in my opinion, as now you may have to have your 
ingestion writer block out time/resources to do these occasional large cleans, 
and make sure the timeouts/resources you set for the clean writer are enough to 
account for this case

My understanding based on our offline discussion is that there are two issues 
we would like to resolve with this proposal 
 - The issue I brought up of having an occasional full scan expensive clean on 
a insert-only dataset whenever theres a surge of non-insert instants
 - The optimization you highlighted where HUDI clean planning should reduce 
latency for cleans on insert-only low-latency workloads by not having to read 
every single .commit metadata file (1 I/O + deserialization call per file in 
active write timeline)

Just thinking out loud, I was thinking from here would could explore two avenues
 - Continue with your proposal here, but also look into optimizations for "full 
scan" clean, such as allowing a user to schedule & execute multiple "partial" 
cleans that each have a bound. This way a large clean backlog can be gradually 
worked on over the course of multiple jobs (so if a low-latency ingestion 
writer doesn't have many resources, after each write commit it can do a smaller 
clean that will make some progress).

 - Store a subset (or subsequence rather) of all instants in the active 
timeline that have updated/replaced a file group (replacecommit, update, small 
file handling, etc) to some file. This will be updated anytime there is a write 
or clean, so it does not need to be computed from reading all instants on the 
timeline. The idea is that using this archival can efficiently block on the 
earliest instant that hasn't been cleaned (lessening the archival -> clean 
dependency that you were concerned about during our offline discussion). And 
clean planner can efficiently see wether or not it should schedule a clean. 
Since now both table services will no longer need to read metadata file of 
every insert-only commit file in the timeline, it should be feasible for a 
low-latency ingestion writer to do.

 

> Transfer extrametada to new commits when new data is not ingeested to trigger 
> table services on the dataset
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: HUDI-7975
>                 URL: https://issues.apache.org/jira/browse/HUDI-7975
>             Project: Apache Hudi
>          Issue Type: Improvement
>            Reporter: Surya Prasanna Yalla
>            Assignee: Surya Prasanna Yalla
>            Priority: Major
>              Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to