[
https://issues.apache.org/jira/browse/HUDI-7975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17882543#comment-17882543
]
Krishen Bhan edited comment on HUDI-7975 at 9/18/24 11:45 PM:
--------------------------------------------------------------
Thanks for sharing this example, I had a concern about the following scenario.
Assume that num_commits for clean is 25 and num_commits for archival is 75,
and due to delayed clean+archival the active timeline is in the following state.
(By the way, I am assuming that in the above proposal HUDI will do clean
planning as usual if it sees at least one replacecommit/cluster, since that
would be inexpensive to add)
{code:java}
ECTR = C1
[c1.commit, c2.commit . . . c10.commit, . . . c99.commit]
where c1 to c10 have updates or small file writes, and c11 - c99 are solely
inserts{code}
if we apply this proposal to 0.x, my understanding is that the following can
happen:
# Clean sees that all the last 50-55 (2 * num_commits_clean + 5) are all
inserts, and doesn't schedule clean
# Archival then runs, and archives c1 to c24. Since there is no
replace/inflight/savepoint blocking it
Now timeline is in following state
{code:java}
ECTR = C1
[c25 . . c99.commit]
where c25 - c99 are solely inserts {code}
the dataset is not inconsistent, but has partitions with old file versions
Now lets say time passes and 50 more commit instants get added to timeline, and
the last instant is not solely an insert
{code:java}
ECTR = C1
[c25 . . c99.commit, c100.commit . . . c124.commit, . . . c149.commit]
Where c124 updated a file group but all other instants are solely inserts.{code}
The next time clean runs, it will find c124 was not just an insert (since it
was in the lookback window of 50-55 instants) and will correctly schedule
clean, targeting all instants before c125. Because the current ECTR C1 is not
in active timeline though, it will do a "full scan" clean and read all
partitions in the dataset and determine which are subject to clean. The final
list of partitions to process includes not only partitions affected by C124,
but also partitions affected by c1 to c10.
The concern I had is that this occasional "larger" clean may take up a lot of
time from writer, causing spikes in (resource x time) that may be unpredictable
from perspective of the user. (From personal experience with 0.10 I've seen it
cause OOM issues, but for now for simplicity's sake I'll assume many of those
are already fixed in later hudi versions). And the longer this clean scheduling
is deferred, the more partitions/files have to be processed+cleaned potentially
by this next full scan clean. The reason why I am focusing on full scan clean
is that afaik this full scan clean cannot be "split" across multiple clean
instants (that each have a "bound" of files/partitions to process). This makes
orchestration a bit tricker in my opinion, as now you may have to have your
ingestion writer block out time/resources to do these occasional large cleans,
and make sure the timeouts/resources you set for the clean writer are enough to
account for this case
My understanding based on our offline discussion is that there are two issues
we would like to resolve with this proposal
- The issue I brought up of having an occasional full scan expensive clean on
a insert-only dataset whenever theres a surge of non-insert instants
- The optimization you highlighted where HUDI clean planning should reduce
latency for cleans on insert-only low-latency workloads by not having to read
every single .commit metadata file (1 I/O + deserialization call per file in
active write timeline)
Just thinking out loud, I was thinking from here would could explore two avenues
- Continue with your proposal here, but also look into optimizations for "full
scan" clean, such as allowing a user to schedule & execute multiple "partial"
cleans that each have a bound. This way a large clean backlog can be gradually
worked on over the course of multiple jobs.
- Store a subset of all instants in the active timeline that have
updated/replaced a file group (replacecommit, update, small file handling, etc)
to some file. This will be updated anytime there is a write or clean. The idea
is that using this archival can efficiently block on the earliest instant that
hasn't been cleaned. And clean planner can efficiently see wether or not it
should schedule a clean. Since now neither table service will ned need to
always read metadata file of every commit instant in timeline.
was (Author: JIRAUSER301521):
Thanks for sharing this example, I had a concern about the following scenario.
Assume that num_commits for clean is 25 and num_commits for archival is 75,
and due to delayed clean+archival the active timeline is in the following state.
{code:java}
ECTR = C1
[c1.commit, c2.commit . . . c10.commit, . . . c99.commit]
where c1 to c10 have updates or small file writes, and c11 - c99 are solely
inserts{code}
if we apply this proposal to 0.x, my understanding is that the following can
happen:
# Clean sees that all the last 50-55 (2 * num_commits_clean + 5) are all
inserts, and doesn't schedule clean
# Archival then runs, and archives c1 to c24. Since there is no
replace/inflight/savepoint blocking it
Now timeline is in following state
{code:java}
ECTR = C1
[c25 . . c99.commit]
where c25 - c99 are solely inserts {code}
the dataset is not inconsistent, but has partitions with old file versions
Now lets say time passes and 50 more commit instants get added to timeline, and
the last instant is not solely an insert
{code:java}
ECTR = C1
[c25 . . c99.commit, c100.commit . . . c124.commit, . . . c149.commit]
Where c124 updated a file group but all other instants are solely inserts.{code}
The next time clean runs, it will find c124 was not just an insert (since it
was in the lookback window of 50-55 instants) and will correctly schedule
clean, targeting all instants before c125. Because the current ECTR C1 is not
in active timeline though, it will do a "full scan" clean and read all
partitions in the dataset and determine which are subject to clean. The final
list of partitions to process includes not only partitions affected by C124,
but also partitions affected by c1 to c10.
The concern I had is that this occasional "larger" clean may take up a lot of
time from writer, causing spikes in (resource x time) that may be unpredictable
from perspective of the user. (From personal experience with 0.10 I've seen it
cause OOM issues, but for now for simplicity's sake I'll assume many of those
are already fixed in later hudi versions). And the longer this clean scheduling
is deferred, the more partitions/files have to be processed+cleaned potentially
by this next full scan clean. The reason why I am focusing on full scan clean
is that afaik this full scan clean cannot be "split" across multiple clean
instants (that each have a "bound" of files/partitions to process). This makes
orchestration a bit tricker in my opinion, as now you may have to have your
ingestion writer block out time/resources to do these occasional large cleans,
and make sure the timeouts/resources you set for the clean writer are enough to
account for this case
My understanding based on our offline discussion is that there are two issues
we would like to resolve with this proposal
- The issue I brought up of having an occasional full scan expensive clean on a
insert-only dataset whenever theres a surge of non-insert instants
- The optimization you highlighted where HUDI clean planning should reduce
latency for cleans on insert-only low-latency workloads by not having to read
every single .commit metadata file (1 I/O + deserialization call per file in
active write timeline)
Just thinking out loud, I was thinking from here would could explore two avenues
- Continue with your proposal here, but also look into optimizations for "full
scan" clean, such as allowing a user to schedule & execute multiple "partial"
cleans that each have a bound. This way a large clean backlog can be gradually
worked on over the course of multiple jobs.
- Store a subset of all instants in the active timeline that have
updated/replaced a file group (replacecommit, update, small file handling, etc)
to some file. This will be updated anytime there is a write or clean. The idea
is that using this archival can efficiently block on the earliest instant that
hasn't been cleaned. And clean planner can efficiently see wether or not it
should schedule a clean. Since now neither table service will ned need to
always read metadata file of every commit instant in timeline.
> Transfer extrametada to new commits when new data is not ingeested to trigger
> table services on the dataset
> -----------------------------------------------------------------------------------------------------------
>
> Key: HUDI-7975
> URL: https://issues.apache.org/jira/browse/HUDI-7975
> Project: Apache Hudi
> Issue Type: Improvement
> Reporter: Surya Prasanna Yalla
> Assignee: Surya Prasanna Yalla
> Priority: Major
> Labels: pull-request-available
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)