kbuci commented on code in PR #11593:
URL: https://github.com/apache/hudi/pull/11593#discussion_r1706327415


##########
rfc/rfc-79/rfc-79.md:
##########
@@ -0,0 +1,138 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-[79]: [Robust spark writes]
+
+## Proposers
+
+- @nsivabalan
+- @nbalajee
+
+## Approvers
+ - @vinoth
+ - @codope
+
+## Status
+
+JIRA: https://issues.apache.org/jira/browse/HUDI-7967
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Apache Hudi is a transactional data lake platform that assists users to write 
and read from hadoop compatible file systems. Hudi guarantees ACID semantics to 
ensure no dirty writes are exposed to concurrent readers and, writes are either 
entirely complete or is rolledback. Hudi is designed to run with distributed 
compute engines like Spark and Flink. Especially with spark, there are chances 
of task retries, stage retries which are beyond the limits of Hudi’s 
capabilities and hence Hudi’s design and implementation has to ensure writes 
are resilient to such task and stage retries. Hudi should guarantee no data 
consistency issues due to any of these failure/retry scenarios. But off late we 
have identified edge cases which calls for this RFC to build a robust and 
foolproof mechanism to ensure Hudi is resilient to such task and stage failures 
and retries.
+
+
+## Background
+Spark task retries is not new to Hudi writes and we have already designed a 
marker based mechanism to tackle such spark task retries. Feel free to checkout 
[this](https://hudi.apache.org/docs/markers) page for more detailed info on 
markers in Hudi and how it ensures spurious data files accounted for with Hudi 
writes.
+
+### Gaps
+Even though marker based reconciliation tackles most common task retry 
scenarios, there are few edge cases which our existing marker based 
reconciliation may not handle. Let's take a look at the sequence of events in 
our write pipeline and go from there.
+
+![](./writes.png)
+
+_Reference diagram highlighting the write skeleton in Hudi_
+
+
+1.Reading from source
+2. Deduping
+3.Indexing
+3. Partitioning + spark file handling
+4. I/O or writes (write handles)
+5. Marker reconciliation
+6. Prepare commit metadata.
+7. Write to metadata table
+8. Complete the write to data table (marking commit as complete).
+
+
+Here the major failure scenarios are round steps 5, 6, 7 and 8. If a spark 
task(or a subset of tasks) is attempted twice in step5, we could see 2 data 
files written to disk. And we should be able to find markers for both attempts. 
During marker based reconciliation, we should be able to account for the 
spurious data file and delete it.
+
+#### Case1:
+Stray executor:
+
+1 -> 2 … 5(writes) ->
+The write stage completes and spark triggers a kill for any additional tasks 
that is currently running. Lets say due to executor failure/spark speculative 
execution, another task was triggered, but attempt1 of task completed and is 
sent back to the driver when the write stage is completed. And hence we have a 
stray task running. At the end of the stage, spark will be triggering a kill 
for any stray tasks. And due to networking glitch, it could so happen that the 
task could not be reached for killing and it is running. During marker based 
reconciliation, we might see the marker file, but the data file could be 
missing if the task is still making progress. And eventually when the task 
completes, the spurious data file could be added to storage. Even the commit 
metadata may not be tracking this spurious data file.
+We heard from uber, that this stray executor could sometimes be running even 
after spark application is completed (very one off case)
+
+#### Case2:
+Dag retrigger while writing to MDT(spark rdd cache invalidation). Issue 
exacerbated with RLI.
+
+1 -> 2 … 5(writes) -> 6(marker based reconciliation) ->
+After marker based reconciliation is complete, while trying to write to MDT, 
say the dag is retriggered due to spark rdd cache invalidation. So, write 
handles could be re-executed, but the reconciliation may not get executed 
again. So, this could lead to duplicate data files on storage. Infact our 
commit metadata could be tracking one set of files, but MDT writes could be 
tracking a diff set of files.
+
+
+## Design
+
+We are proposing two additional markers to assist with detecting and thwarting 
spurious data files in Hudi on any cost.
+### 1. Completion Marker:
+   The marker mechanism discussed above strictly speaking is a begin marker. 
So, lets add a completion maker at the end of writes (write handle) for each 
data file produced to mark the completion of the write for a given task in 
addition to the begin marker. Completion marker will only track the fileId and 
commit time(write token should be ignored) and the writeStatus gets serialized 
as content. This might be in contrast to the begin markers where the marker 
will track the actual data file name.
+
+Write handle changes:
+In the  constructor
+1. If completion marker is already present for the given fileID and commit 
time, read the CM(completion marker), to deser write status and return right 
away. (short circuit).
+2. If completion marker is not present, go ahead w/ writing the data file(s).
+3. In the end just before returning the writeStatus back to driver:
+   again check for CM.
+   if present, delete the newly created file and read the CM, to deser write 
status and return the same WriteStatus.
+   if not, go ahead, create CM and return WriteStatus. or spark could have 
aborted the task by then in which case there won't be any data file only.
+
+Most of spark task retries, executor losses should be taken care by the marker 
based reconciliation even w/o completion marker fix. Above design should help 
w/ stray executor issues and spark speculative execution.

Review Comment:
   I believe someone had suggested the idea of using the timeline server, since 
then hypothetically you could have a setup where each file-writing spark task 
has to get an ACK from the timeline server before starting any file writes, and 
informs the timeline server when it is done (like some sort of simplified 
lease). Each task could also perform "heartbeating" against the timeline 
server. And this way you can prevent spark tasks from creating files after the 
finalize-write check while still  decreasing the chance of a specific 
unresponsive task attempt failing/delaying the entire job. Specifically, I was 
thinking that each file-writing task could follow the following steps:
   1. Consume and deserialize all existing CMs , and iterate through records 
until getting to the first record that isn't part of a CM (and hence needs to 
be written)
   2. Request the Timeline server (TS) to get permission for it's assigned 
file-id prefix for the target instant time
   3. Wait for response from TS. If ACK from TS is granted, start a heartbeat 
with TS (the heartbeat corresponds to the file-id prefix). If a DENY is 
received, then deserialize any new CMs that may have appeared. If all records 
have been exhausted, then return the deserialized write status. Otherwise fail 
with exception.
   4. Create a concurrent background task that periodically heartbeats with TS 
by polling. If the task detects that it's own heartbeat has expired it will 
need to close it's DFS write handle (not sure best way to do this). Then it 
will see if any new CMs exist, and if so continue deserializing them. If all 
records have been exhausted, the task can return the deserialized WriteStatus. 
Else, it should fail with exception. 
   5. Start writing records into new data files (the usual/existing logic for 
the write handle)
   6. Once all records are written, close the heartbeat and return the 
writestatuses 
   
   If each file-writing task does the above, then the finalize write check can 
perform the following steps before starting
   1. Instruct the TS to DENY any new requests for the target instant time
   2. Wait for all heartbeats (listed by TS for the target instant time) to 
expire
   
   Is there a specific reason why we are not pursuing using timeline server in 
this manner? I guess there is the issue that you have to handle cases where a 
single spark task failure can cause the entire stage/finalize write check to 
fail (due to telling the Timeline server that it has started working on a file, 
but getting stuck after). But using heartbeating like mentioned above should 
minimize this issue, though implementing it will be tricky.
   
   If we are settling on not using the timeline server, then we can try 
following an approach of that was brought up during one of the offline 
discussions; allowing ingestion to write extra/stray files and have them 
cleaned up later. Specifically, we update the logic for all readers/writers 
(such as file system view modules) to perform the below flow when filtering 
valid files in a data partition
   1. Filter for all files that belong to a valid file group and completed 
instant (as usual)
   2. Group all files by their file group prefix. Find all group of files where 
the size of the group is greater than the value of the file group's file-id 
suffix. Get all file names from these groups; this essentially is a list of 
files where some files may be spurious/extra writes. 
   3. If this list of files is not empty, then filter out all files that are 
not part of the completed instant file metadata. This way only valid files from 
the writes are returned to the user, and any files created by a stray task will 
be ignored, and eventually deleted by clean.
   
   Although this approach would require a lot of code refactor, it will only 
read the commit metadata if there is a sign that stray task created the file. 
Also, it would only affect users reading the DFS directly instead of using the 
MDT (since a stray task wouldn't have it's file added to MDT).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to