On 21 Feb 2017, at 01:00, Ryan Blue 
<rb...@netflix.com.INVALID<mailto:rb...@netflix.com.INVALID>> wrote:

We just wrote a couple new committers for S3 that we're beginning to roll out 
to our Spark users. I've uploaded a repo with it if you'd like to take a look:

  https://github.com/rdblue/s3committer

The main problem with the UUID approach is that data is live as soon as the S3 
upload completes. That means that readers can get partial results while a job 
is running that may not be eventually committed (since you will remove the UUID 
later). You may also have a problem with partitioned task outputs.


You'd have to encode the task ID in the output file name to identify files to 
roll back in the event you need to revert a task, but if you have partitioned 
output, you have to do a lot of directory listing to find all the files that 
need to be removed. That, or you could risk duplicate data by not rolling back 
tasks.


Bear in mind that recursive directory listing isn't so expensive once you have 
the O(1)-ish listFiles(files, recursive) operation of HADOOP-13208.



The approach we took is to use the multi-part upload API to stage data from 
tasks without issuing the final call to complete the upload and make the data 
live in S3. That way, we get distributed uploads without any visible data until 
the job committer runs. The job committer reads all of the pending uploads and 
commits them. If the job has failed, then it can roll back the known uploads by 
aborting them instead, with the data never visible to readers.


Yes, that's what I've been doing too. I'm choreographing the task and committer 
via data serialized to S3 itself. On a task failure that will allow us to roll 
back all completely written files, without the need for any task-job 
communications. I'm still thinking about having an optional+async scan for 
pending commits to the dest path, to identify problems and keep bills down.



The flaw in this approach is that you can still get partial writes if the 
driver fails while running the job committer, but it covers the other cases.

There's a bit of that in both the FileOutputFormat and indeed, in 
HadoopMapReduceCommitProtocol. It's just a small window, especially if you do 
those final PUTs in parallel


We're working on getting users moved over to the new committers, so now seems 
like a good time to get a copy out to the community. Please let me know what 
you think.

I'll have a look at your code, see how it compares to mine. I'm able to take 
advantage of the fact that we can tune the S3A FS, for example, by modifying 
the block output stream to *not* commit its work in the final close()

https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java

This means that provided the output writer doesn't attempt to read the file 
it's just written, we can do a write straight to the final destination


What your patch has made me realise is that I could also do a delayed-commit 
copy by reading in a file, doing a multipart put to its final destination, and 
again, postponing the final commit. this is something which tasks could do in 
their commit rather than a normal COPY+DELETE  rename, passing the final 
pending commit information to the job committer. This'd make the rename() 
slower as it will read and write the data again, rather than the 6-10 MB/s of 
in-S3 copies, but as these happen in-task-commit, rather than in-job-commit, 
they slow down the overall job less. That could be used for the absolute path 
commit phase.


-Steve

rb

On Mon, Feb 20, 2017 at 10:14 AM, Matthew Schauer 
<matthew.scha...@ibm.com<mailto:matthew.scha...@ibm.com>> wrote:
I'm using Spark 1.5.2 and trying to append a data frame to partitioned
Parquet directory in S3.  It is known that the default
`ParquetOutputCommitter` performs poorly in S3 because move is implemented
as copy/delete, but the `DirectParquetOutputCommitter` is not safe to use
for append operations in case of failure.  I'm not very familiar with the
intricacies of job/task committing/aborting, but I've written a rough
replacement output committer that seems to work.  It writes the results
directly to their final locations and uses the write UUID to determine which
files to remove in the case of a job/task abort.  It seems to be a workable
concept in the simple tests that I've tried.  However, I can't make Spark
use this alternate output committer because the changes in SPARK-8578
categorically prohibit any custom output committer from being used, even if
it's safe for appending.  I have two questions: 1) Does anyone more familiar
with output committing have any feedback on my proposed "safe" append
strategy, and 2) is there any way to circumvent the restriction on append
committers without editing and recompiling Spark?  Discussion of solutions
in Spark 2.1 is also welcome.



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Output-Committers-for-S3-tp21033.html
Sent from the Apache Spark Developers List mailing list archive at 
Nabble.com<http://Nabble.com>.

---------------------------------------------------------------------
To unsubscribe e-mail: 
dev-unsubscr...@spark.apache.org<mailto:dev-unsubscr...@spark.apache.org>




--
Ryan Blue
Software Engineer
Netflix

Reply via email to