nsivabalan commented on code in PR #11555:
URL: https://github.com/apache/hudi/pull/11555#discussion_r1899852589


##########
rfc/rfc-79/rfc-79.md:
##########
@@ -0,0 +1,314 @@
+w<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+file distributed with this work for additional information regarding copyright 
ownership. The ASF licenses this file to
+You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with the 
License. You may obtain a copy of the License
+at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "
+AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied. See the License for the specific
+language governing permissions and limitations under the License. -->
+
+# Add support for cancellable clustering table service plans
+
+## Proposers
+
+Krishen Ben (kbuci)
+
+## Approvers
+
+Sivabalan Narayanan (nsivabalan)
+
+## Status
+
+In Progress
+
+JIRA: HUDI-7946
+
+## Abstract
+
+Clustering is a table service that assist with optimizing table/files layout 
in Hudi to speed up read queries. Currently
+ingestion writers that runs conrrently with clustering and having overlap with 
the data being touched by both the
+writers (ingestion and table service), ingestion writers are aborted. Thus the 
clustering table service plans can delay
+ingestion writes from updating a dataset with recent data if potential write 
conflicts are detected. Furthermore, a
+clustering plan that isn't executed to completion for a large amount of time 
(due to repeated failures, application
+misconfiguration, or insufficient resources) will degrade the read/write 
performance of a dataset due to delaying clean,
+archival, and metadata table compaction. This is because currently HUDI 
clustering plans, upon being scheduled, must be
+executed to completion. And additonally, this will prevent any ingestion write 
targeting the same files from succeeding
+(due to posing as a write conflict) as well as can prevent new table service 
plans from targeting the same files. This
+RFC proposes to support "Cancellable" Clustering plans. Support for such 
cancellable clustering plans will provide HUDI
+an avenue to fully cancel a clustering plan and allow other table service and 
ingestion writers to proceed and avoiding
+starvation based on user needs.
+
+## Background
+
+### Current state of Execution of table service operations in Hudi
+
+As of now, the table service operations `COMPACT` and `CLUSTER` are implicitly 
"immutable" plans by default, meaning
+that once a plan is scheduled, it will stay as a pending instant until a 
caller invokes the table service execute API on
+the table service instant and successfully completes it. Specifically, if an 
inflight execution fails after
+transitioning the instant to inflight, the next execution attempt will 
implictly create and execute a rollback plan (
+which will delete all new instant/data files), but will keep the table service 
plan. And then the table service will be
+re-attempted. This process will repeat until the instant is completed. The 
below visualization captures these
+transitions at a high level
+
+![table service lifecycle 
(1)](https://github.com/user-attachments/assets/4a656bde-4046-4d37-9398-db96144207aa)
+
+## Clean and rollback of failed writes
+
+The clean table service, in addition to performing a clean action, is 
responsible for rolling back any failed ingestion
+writes (non-clustering/non-compaction inflight instants that are not being 
concurrently executed by a writer). This
+means that inflight clustering plans are not currently subject to clean's 
rollback of failed writes. As detailed below,
+this proposal for supporting cancellable clustering will benefit from enabling 
clean be capable of targeting such
+cancellable clustering plans.
+
+## Goals
+
+### (A) An ingestion job should be able to cancel and ignore any inflight 
cancellable clustering instants targeting the same data as the ingestion writer.
+
+The current requirement of HUDI needing to execute a clustering plan to 
completion, forces ingestion writers to abort a
+commit if a table service plan is conflicting. Becuase an ingestion writer 
typically determines the exact file groups it
+will be updating/replacing after building a workload profile and performing 
record tagging, the writer may have already
+spent a lot of time and resources before realizing that it needs to abort. In 
the face of frequent table service plans
+or an old inflight plan, this will cause delays in adding recent upstream 
records to the dataset as well as
+unnecessairly take away resources (such as Spark executors in the case of the 
Spark engine) from other applications in
+the data lake. Making the clustering plan cancellable should avoid this 
situation by allowing ingestion to request all
+conflicting cancellable clustering plans to be "cancelled" and ignore inflight 
plans that already have been requested
+for cancellation. The latter will ensure that any incomplete cancellable 
clustering instants that have been requested
+for cancellation but have not yet been aborted, can be ignored by ingestion 
writers.
+
+### (B) A cancellable table service plan should be eligible for cancellation 
at any point before committing
+
+In conjunction with (A), any caller(ingestion writer for eg) should be able to 
request cancellation for an inflight
+cancellable clustering plan. We should not need any synchronous mechanism 
where in the clustering plan of interest
+should be aborted completely before which the ingestion writer can proceeed. 
We should have alight weight mechanism with
+which the ingestion writer make a cancellation request and moves on to carry 
out its operation with the assumption that
+the respective clustering plan will be aborted. This requirement is needed due 
to presence of concurrent and async
+writers for clustering execution, as another worker should not need to wait 
(for the respective concurrent clustering
+worker to proceed with execution or fail) before confirming that its 
cancellation request will be honored. Once the
+request for cancellation succeeds, all interested entities like the ingestion 
writer, reader, asynchronous clusteirng
+execution job should assume the clustering plan is cancelled.
+
+### (C) An incomplete cancellable clustering plan should eventually have its 
partial writes cleaned up.
+
+Although a cancellable clustering plan being requested for cancellation can 
ensure that the instant is never committed,
+there still needs to be a mechanism to have its data and instant files cleaned 
up permanantly. The clustering worker
+itself should be able to do the cleanup, but this may not be sufficient as 
transient failures/resource allocation issues
+can prevent workers from re-attempting their execution of the plan. In such 
cases, the `CLEAN` table service can be used
+to guarantee, that an incomplete cancellable plan which has a request for 
cancellation, is eventually cleaned up(rolled
+back and aborted), since datasets that undergo clustering are anyway expected 
to undergo regular clean operations.
+Because an inflight plan remaining on the timeline can degrade performance of 
reads/writes (as mentioned earlier), a
+cancellable table service plan should be elligible to be targeted for cleanup 
if it has been already requested for
+cancellation or if HUDI clean, deems that it has remained inflight for too 
long (or some other critera). In other words,
+the two main scenarios that `CLEAN` will now address are
+
+- If a cancellable clustering plan is scheduled but is never successfully 
executed (due to the corresponding worker
+  never running or repeatedly failing), `CLEAN` will requested the instant for 
cancellation (allowed by (B) ) and clean
+  it up. This has a chance of prematurely cleaning up a cancellable clustering 
plan before it has the chance to be
+  executed (if execution of clustering is handled async) but that can be 
minimized by allowing clean to only do this if
+  instant is very old.
+- If a cancellable clustering plan was requested for cancellation but never 
cleaned up (again due to the corresponding
+  worker never running or repeatedly failing), `CLEAN` will take up task of 
cleaning up the inflight instant.
+
+Note that a failed cancellable clustering plan should still be able to be 
safely cleaned up immediately - the goal here
+is just to make sure, an inflight plan won't stay on the timeline for an 
unbounded amount of time but also won't be
+likely to be prematurely cleaned up by clean before it has a chance to be 
executed.
+
+## Design
+
+### Enabling a clustering plan to be cancellable
+
+To satisfy goal (A), a new config flag named "cancellable" can be added to a 
clustering plan. A writer that intends to
+schedule a cancellable table service plan, can enable the flag in the 
serialized plan metadata. Any writer executing the
+plan can infer that the plan is cancellable, and when trying to commit the 
instant should abort, if it detects that is
+has been requested for cancellation. As a future optimization, the cancellable 
clustering worker can continually poll
+during its execution to see if it has been requested for cancellation. On the 
other side, with the ingestion writer
+flow, the commit finalization logic for ingestion writers can be updated to 
ignore any inflight clustering plans if they
+are cancellable. For the purpose of this design proposal, consider the 
existing ingestion write flow as having three
+steps:
+
+1. Schedule itself on the timeline with a new instant time in a .requested file
+2. Process/record tag incoming records, build a workload profile, and write 
the updating/replaced file groups to a "
+   inflight" instant file on the timeline. Check for conflicts and abort if 
needed.
+3. Perform write conflict checks and commit the instant on the timeline
+
+The aforementioned changes to ingestion and clustering flow will ensure that 
in the event of a conflicting ingestion and
+cancellable table service writer, the ingestion job will take precedence (and 
cause the cancellable table service
+instant to eventually cancel) as long as a cancellable clustering plan hasn't 
be completed before (2). Since if the
+cancellable table service has already been completed before (2), the ingestion 
job will see that a completed instant (a
+cancellable table service action) conflicts with its ongoing inflight write, 
and therefore it would not be legal to
+proceed. On such cases, ingestion writer will have to abort itself instead of 
proceeding to completion.
+
+### Adding a cancel action and aborted state for cancellable plans
+
+This proposed design will also involve adding a new instant state and internal 
hoodie metadata directory, by making the
+following changes:
+
+#### Cancel action
+
+* We are proposing to add a new .hoodie/.cancel folder, where each file 
corresponds to an instant time that a writer
+  requested for cancellation. As will be detailed below, a writer can cancel 
an inflight plan by adding the instant to
+  this directory and execution of table service will not allow an instant to 
be committed, if it appears in this
+  /.cancel directory. The new /.cancel folder will enable goals (A) & (B) by 
allowing writers to permentantly prevent an
+  ongoing cancellable table service write from committing by requesting for 
cancellation, without needing to block/wait
+  for the table service writer. Once an instant is requested for cancellation 
(added to /.cancel) it cannot be revoked (
+  or "
+  un-cancelled") - it must be eventually transitioned to aborted state, as 
detailed below. To implement (A), ingestion
+  will be updated such that during write-conflict detection, it will create an 
entry in /.cancel for any cancellable
+  plans with a detected write conflict and will ignore any candidate inflight 
plans that have an entry in /.cancel.
+
+#### Aborted state
+
+* We are proposing to add an ".aborted" state type for cancellable table 
service plan. This state is terminal and with
+  this new addition, an instant can only be transitioned to .*commit or 
.aborted (not both) or could be rolledback (
+  ingestion writes). The new ".aborted" state will allow writers to infer 
whether a cancelled table service plan needs
+  to still have it's partial data writes cleaned up from the dataset, which is 
needed for (C). Once an instant appears
+  in /.cancel folder, it can and must eventually be transitioned to .aborted 
state. This new state will ensure that
+  cancelled instants are eventually "cleaned up" from the dataset and internal 
timeline metadata. The additional design
+  change below will complete the remaining requirement for (C) of eventual 
cleanup.
+
+### Handling cancellation of plans
+
+An additional config "cancellation-policy" can be added to the table service 
plan to indicate when it is eligible to be
+cancelled and aborted by `CLEAN` operation. This policy can be a threshold of 
hours or instants on timeline, where if
+that # of hours/instants have elapsed since the plan was scheduled(w/o being 
executed), any call to `CLEAN` operation
+will also cancel and abort the instant. This policy should be configured by 
the worker scheduling a cancellable table
+service plan, based on the amount of time they expect the plan to remain on 
the timeline before being picked up for
+execution. For example, if the plan is expected to have its execution deferred 
to a few hours later, then the
+cancellation-policy should be lenient in allowing the plan to remain many 
hours on the timeline before it could be
+deemed eligible for abortion. Note that this cancellation policy is not used 
to infer whether a clustering plan is
+currently being executed - similar to how concurrent ingestion writes are 
rolled back by lazy clean policy, A
+cancellable clustering plan can only be aborted once it is confirmed that an 
ongoing writer is no longer progressing it.
+
+In order to ensure that other writers can indeed permanantely cancel a 
cancellable table service plan (such that it can
+no longer be executed), additional changes to clean and table service flow 
will be need to be added as well, as will be
+proposed below. Also, note that the cancellation-policy is only required to be 
honored by `CLEAN`: a user can choose
+setup an application to aggresively cancel a failed cancellable table service 
plan even if it has not meet the critera
+for its cancellation-policy yet. This can be useful if a user wants a utility 
to manually ensure that clean/archival for
+a dataset progresses immediately or knows that a cancellable table service 
plan will not be attempted again or cancelled
+& aborted by a table service worker in the future. The two new cancel APIs in 
the below proposal provide a method to
+achieve this.
+
+#### Enabling clustering execution and clean table service to support 
cancellation and automatic cleanup
+
+The clustering execution flow will be updated to check the /.cancel folder 
during a pre-commit check before completing
+the instant. If the instant is a target of /.cancel, then all of its data 
files will be deleted and the instant will be
+transitioned to .aborted. These checks will be performed within a transaction, 
to guard against callers cancelling an
+already-committed instant. In addition, in order to avoid the scenario of 
writer executing an instant but having its
+data files being deleted by a concurrent caller cancelling & aborting the 
instant, the clustering execution flow will
+perform heartbeats. If an instant has an active heartbeat it can be requested 
for cancellation (by adding an instant in
+/.cancel) but it cannot yet be cleaned up and transitioned to .aborted state - 
this is sufficient to safely implement
+goal (B) with respect to concurrent workers.
+
+The below visualization shows the flow for cancellable table service plans 
(steps that are already in existing table
+service flow are grey-ed out)
+
+![cancel table service lifecycle with lock 
(5)](https://github.com/user-attachments/assets/c05dae68-4330-4d85-b29e-e3e47754509e)
+
+Having this new .hoodie/.cancel folder (in addition to only having the 
.aborted state) is needed not only to allow any
+caller to forcebily block an instant from being committed, but also to prevent 
the need for table service workers to
+also perform write conflict detection (that ingestion already will perform) or 
unnecessarily re-attempt execution of the
+instant if it has been already been requested for cancellation but not 
succefully aborted yet. The below visualized
+scenario shows how this clustering attempt will "short circuit" in this manner 
by checking /.cancel to see if clustering
+execution should even proceed. This scenario also includes an example of 
concurrent writers to show how transaction and
+heartbeating in the above proposed flow will allow correct behavior even in 
the face of concurrent writers attempting to
+execute and/or cancel the instant.
+
+![cancel flow table serivce 
(1)](https://github.com/user-attachments/assets/f130f326-952f-49eb-bdbb-b0b34206f677)
+
+Aside from modifications to the clustering execution flow, a new pair of 
cancel APIs request_cancel and execute_abort
+will be added for all writers to use. They will allow a writer to request an 
instant to be cancelled (by adding it to
+/.cancel) and transition an instant to the terminal .aborted state, 
respectively.
+
+The new cancel API request_cancel will perform the following steps
+
+1. Start a transaction
+2. Reload active timeline
+3. If instant is already comitted, throw an exception
+4. If instant is already aborted, exit without throwing exception
+5. create a file under /.cancel with the target instant name
+6. End the transaction
+
+If this call succeeds, then the caller can assume that the target instant will 
not be commited and can only transitioned
+to .aborted state from that point onwards.
+
+The other API execute_abort will be added which allows a writer to explictly 
abort the target instant that has already
+had its cancellation requested (by adding the instant under /.cancel). Note 
that this API will also do cleaning up all
+leftover files in the process. Unlike request_cancel though, it cannot be 
completed if another writer is still executing
+the instant. In order to enforce this, heartbeating will be used to allow 
writers to infer whether another instant is
+being executed by a table service writer. The execute_abort API will perform 
the following steps
+
+1. Start a transaction
+2. Reload active timeline
+3. If instant has been already aborted, exit without throwing exception. 
Cleanup file from /.cancel if still present

Review Comment:
   we might need to account for below scenario. 
   
   our clustering table service can go through multiple attempts in general. 
Which means, there could be multiple rollbacks seen in timeline. so, when 
processing a request for cancellation, we need to deduce whether a rollback 
seen for a cancellable clustering plan happened before the cancel request or 
after. 
   
   So that, we can decide whether to trigger another rollback or just proceed 
onto to moving the state to "abort" 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to