[ 
https://issues.apache.org/jira/browse/HUDI-9073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Davis Zhang updated HUDI-9073:
------------------------------
    Description: 
h2. Motivation

Back in hudi 0.x, all instants are ordered by "request time", which notes the 
"initiation of a given write operation". On the other hand, in hudi 1.x 
"completion time" is introduced, which noted the "changes applied by the write 
operation is committed and visible to readers/writers".
 
>From a standard DBMS point of view, there should be only "completion time 
>based" event ordering to any concurrent reader/writers of a DBMS. Indeed the 
>notion of "request time" may serve some purpose for hudi to coordinate some 
>internal states across table services and other writers. Yet leaking "request 
>time" to hudi consumers is a miss of design as what they actually care are 
>completion time ordering.
 
As of today, in order to fill-in the gap various bandits has applied - we 
introduced hollow commits handling and also introduced "state transition time 
ordering". This leads to unnecessary complexity and impair maintainability and 
dev velocity because of these ugly pieces.
 
h3. Ideal end state

For 99.99% we should use completion time based ordering. This means all V2 
instant generator use CompletionTimeComparator.
 
We should revisit all request time based ordering related logic (request time 
comparator usage) and replace with completion time based properly. If it is not 
possible, document in the code on why we need to know about the request time 
and why it is not avoidable.

 

For any event ordering mode changing from request time based to completion time 
based, we need to make sure no loss of "exact once processing" semantics during 
this transitioning. A potential proposal is as follows

 

 
The doc close one TODO item of the design from @Ethan Guo 
h3. Motivation & problem statement
In hudi 0.x, the checkpoint of a delta streamer is based on the requested time. 
In 1.x it requires completion time. We need to design a migration protocol so 
the version upgrade does not lead to extra operation overhead and no data 
correctness issue.
 
- *[No duplication]* No completed instants will be applied twice.
- *[No loss of update]* No completed instants will be missed.
- *[Clean migration]* After [Transition completion period], we will be strictly 
applying changes according to instant completion time and there is no extra 
overhead of doing anything.
h3. The migration protocol design
For checkpoint translation, I come up with a O(1) space and time complexity 
protocol when the hollow commit handling strategy is configured to "fail" & 
"block".
 
>From the user's perspective, it looks like as follows:
- *[Start whenever we want]* Turn on the feature flag, we immediately switch to 
completion time based ordering.
- *[Transition period]* After a transition period where the extra migration 
code path is running
- *[Transition completion period]* No extra code related to the checkpoint 
intepretation is running, we run completion time based checkpointing cleanly.
 
Guarantees delivered by the "fail"/"block" strategy of hollow commits handling:
If we processed until requested time instant_x1_requested_time, all instants 
with a smaller requested time are already applied, those with larger requested 
time are not.
 
We can simply use this rule to ensure [No duplication] and [No loss of update]. 
For [Clean migration] it is designed with some termination check to avoid 
running the migration logic.
 
h3. Protocol:
 
What we start with is
- a requested time based checkpoint `instant_x1_requested_time` of the source 
table.
- Active timeline of the source table.
 
Turn on the feature flag, in the next delta commit (round 1) we do:
1. Get the max instant completion time MAX_COMPLETION_TIME.
2. start from the first completed instant after instant_x1_requested_time and 
start to apply instants according to their completion time. Filtering with 
function
boolean shouldIgnoreCompletionTime(String completionTime) \{ String requestTime 
= getInstantRequestedTime(completionTime); <--what if archived? return 
requestTime <= instant_x1_requested_time; }
For a few subsequent delta commits, we just behave like completion time 
ordering with the extra filtering of step 2.
 
Once we hit a point where the next completionTime to be processed > 
MAX_COMPLETION_TIME, we are fully switched to the completion time ordering. No 
extra filtering required. No extra state like 
MAX_COMPLETION_TIME/instant_x1_requested_time we need to track, in that round 
of ingestion, we only need to write the checkpointV2 which is the completion 
time.
 
 
h3. Example
 
-----
The commit metadata looks like follows
we start with feature flag off, where we only have checkpointV1 which is 
request time based.
 
{ ckpV1: instant_x1_requested_time }
 
After turning on the flag, the first round of ingestion write metadata like. We 
know we are first round as we can tell there is only ckpV1 attributed but no 
maxCompletionTime attribute.
{ ckpV1: instant_x1_requested_time ckpv1v2mgration_maxCompletionTime: 
MAX_COMPLETION_TIME ckpV2: derived based on the filtering + normal completion 
time processing logic }
 
Once we processed instants with completion time beyond MAX_COMPLETION_TIME, we 
are fully switched. The instant we write is
{ ckpV2: normal completion time based ordering. }
 
We know we are fully migrated as we can tell there is only ckpV2 attribute but 
no maxCompletionTime attribute.

  was:
h2. Motivation
Back in hudi 0.x, all instants are ordered by "request time", which notes the 
"initiation of a given write operation". On the other hand, in hudi 1.x 
"completion time" is introduced, which noted the "changes applied by the write 
operation is committed and visible to readers/writers".
 
>From a standard DBMS point of view, there should be only "completion time 
>based" event ordering to any concurrent reader/writers of a DBMS. Indeed the 
>notion of "request time" may serve some purpose for hudi to coordinate some 
>internal states across table services and other writers. Yet leaking "request 
>time" to hudi consumers is a miss of design as what they actually care are 
>completion time ordering.
 
As of today, in order to fill-in the gap various bandits has applied - we 
introduced hollow commits handling and also introduced "state transition time 
ordering". This leads to unnecessary complexity and impair maintainability and 
dev velocity because of these ugly pieces.
 
h3. Ideal end state
For 99.99% we should use completion time based ordering. This means all V2 
instant generator use CompletionTimeComparator.
 
We should revisit all request time based ordering related logic (request time 
comparator usage) and replace with completion time based properly. If it is not 
possible, document in the code on why we need to know about the request time 
and why it is not avoidable.


> Deprecate request time ordering with commit time ordering in 1.x
> ----------------------------------------------------------------
>
>                 Key: HUDI-9073
>                 URL: https://issues.apache.org/jira/browse/HUDI-9073
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Davis Zhang
>            Assignee: Y Ethan Guo
>            Priority: Major
>             Fix For: 1.1.0
>
>
> h2. Motivation
> Back in hudi 0.x, all instants are ordered by "request time", which notes the 
> "initiation of a given write operation". On the other hand, in hudi 1.x 
> "completion time" is introduced, which noted the "changes applied by the 
> write operation is committed and visible to readers/writers".
>  
> From a standard DBMS point of view, there should be only "completion time 
> based" event ordering to any concurrent reader/writers of a DBMS. Indeed the 
> notion of "request time" may serve some purpose for hudi to coordinate some 
> internal states across table services and other writers. Yet leaking "request 
> time" to hudi consumers is a miss of design as what they actually care are 
> completion time ordering.
>  
> As of today, in order to fill-in the gap various bandits has applied - we 
> introduced hollow commits handling and also introduced "state transition time 
> ordering". This leads to unnecessary complexity and impair maintainability 
> and dev velocity because of these ugly pieces.
>  
> h3. Ideal end state
> For 99.99% we should use completion time based ordering. This means all V2 
> instant generator use CompletionTimeComparator.
>  
> We should revisit all request time based ordering related logic (request time 
> comparator usage) and replace with completion time based properly. If it is 
> not possible, document in the code on why we need to know about the request 
> time and why it is not avoidable.
>  
> For any event ordering mode changing from request time based to completion 
> time based, we need to make sure no loss of "exact once processing" semantics 
> during this transitioning. A potential proposal is as follows
>  
>  
> The doc close one TODO item of the design from @Ethan Guo 
> h3. Motivation & problem statement
> In hudi 0.x, the checkpoint of a delta streamer is based on the requested 
> time. In 1.x it requires completion time. We need to design a migration 
> protocol so the version upgrade does not lead to extra operation overhead and 
> no data correctness issue.
>  
> - *[No duplication]* No completed instants will be applied twice.
> - *[No loss of update]* No completed instants will be missed.
> - *[Clean migration]* After [Transition completion period], we will be 
> strictly applying changes according to instant completion time and there is 
> no extra overhead of doing anything.
> h3. The migration protocol design
> For checkpoint translation, I come up with a O(1) space and time complexity 
> protocol when the hollow commit handling strategy is configured to "fail" & 
> "block".
>  
> From the user's perspective, it looks like as follows:
> - *[Start whenever we want]* Turn on the feature flag, we immediately switch 
> to completion time based ordering.
> - *[Transition period]* After a transition period where the extra migration 
> code path is running
> - *[Transition completion period]* No extra code related to the checkpoint 
> intepretation is running, we run completion time based checkpointing cleanly.
>  
> Guarantees delivered by the "fail"/"block" strategy of hollow commits 
> handling:
> If we processed until requested time instant_x1_requested_time, all instants 
> with a smaller requested time are already applied, those with larger 
> requested time are not.
>  
> We can simply use this rule to ensure [No duplication] and [No loss of 
> update]. For [Clean migration] it is designed with some termination check to 
> avoid running the migration logic.
>  
> h3. Protocol:
>  
> What we start with is
> - a requested time based checkpoint `instant_x1_requested_time` of the source 
> table.
> - Active timeline of the source table.
>  
> Turn on the feature flag, in the next delta commit (round 1) we do:
> 1. Get the max instant completion time MAX_COMPLETION_TIME.
> 2. start from the first completed instant after instant_x1_requested_time and 
> start to apply instants according to their completion time. Filtering with 
> function
> boolean shouldIgnoreCompletionTime(String completionTime) \{ String 
> requestTime = getInstantRequestedTime(completionTime); <--what if archived? 
> return requestTime <= instant_x1_requested_time; }
> For a few subsequent delta commits, we just behave like completion time 
> ordering with the extra filtering of step 2.
>  
> Once we hit a point where the next completionTime to be processed > 
> MAX_COMPLETION_TIME, we are fully switched to the completion time ordering. 
> No extra filtering required. No extra state like 
> MAX_COMPLETION_TIME/instant_x1_requested_time we need to track, in that round 
> of ingestion, we only need to write the checkpointV2 which is the completion 
> time.
>  
>  
> h3. Example
>  
> -----
> The commit metadata looks like follows
> we start with feature flag off, where we only have checkpointV1 which is 
> request time based.
>  
> { ckpV1: instant_x1_requested_time }
>  
> After turning on the flag, the first round of ingestion write metadata like. 
> We know we are first round as we can tell there is only ckpV1 attributed but 
> no maxCompletionTime attribute.
> { ckpV1: instant_x1_requested_time ckpv1v2mgration_maxCompletionTime: 
> MAX_COMPLETION_TIME ckpV2: derived based on the filtering + normal completion 
> time processing logic }
>  
> Once we processed instants with completion time beyond MAX_COMPLETION_TIME, 
> we are fully switched. The instant we write is
> { ckpV2: normal completion time based ordering. }
>  
> We know we are fully migrated as we can tell there is only ckpV2 attribute 
> but no maxCompletionTime attribute.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to