On 21 Feb 2017, at 01:00, Ryan Blue <rb...@netflix.com.INVALID<mailto:rb...@netflix.com.INVALID>> wrote:
We just wrote a couple new committers for S3 that we're beginning to roll out to our Spark users. I've uploaded a repo with it if you'd like to take a look: https://github.com/rdblue/s3committer The main problem with the UUID approach is that data is live as soon as the S3 upload completes. That means that readers can get partial results while a job is running that may not be eventually committed (since you will remove the UUID later). You may also have a problem with partitioned task outputs. You'd have to encode the task ID in the output file name to identify files to roll back in the event you need to revert a task, but if you have partitioned output, you have to do a lot of directory listing to find all the files that need to be removed. That, or you could risk duplicate data by not rolling back tasks. Bear in mind that recursive directory listing isn't so expensive once you have the O(1)-ish listFiles(files, recursive) operation of HADOOP-13208. The approach we took is to use the multi-part upload API to stage data from tasks without issuing the final call to complete the upload and make the data live in S3. That way, we get distributed uploads without any visible data until the job committer runs. The job committer reads all of the pending uploads and commits them. If the job has failed, then it can roll back the known uploads by aborting them instead, with the data never visible to readers. Yes, that's what I've been doing too. I'm choreographing the task and committer via data serialized to S3 itself. On a task failure that will allow us to roll back all completely written files, without the need for any task-job communications. I'm still thinking about having an optional+async scan for pending commits to the dest path, to identify problems and keep bills down. The flaw in this approach is that you can still get partial writes if the driver fails while running the job committer, but it covers the other cases. There's a bit of that in both the FileOutputFormat and indeed, in HadoopMapReduceCommitProtocol. It's just a small window, especially if you do those final PUTs in parallel We're working on getting users moved over to the new committers, so now seems like a good time to get a copy out to the community. Please let me know what you think. I'll have a look at your code, see how it compares to mine. I'm able to take advantage of the fact that we can tune the S3A FS, for example, by modifying the block output stream to *not* commit its work in the final close() https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java This means that provided the output writer doesn't attempt to read the file it's just written, we can do a write straight to the final destination What your patch has made me realise is that I could also do a delayed-commit copy by reading in a file, doing a multipart put to its final destination, and again, postponing the final commit. this is something which tasks could do in their commit rather than a normal COPY+DELETE rename, passing the final pending commit information to the job committer. This'd make the rename() slower as it will read and write the data again, rather than the 6-10 MB/s of in-S3 copies, but as these happen in-task-commit, rather than in-job-commit, they slow down the overall job less. That could be used for the absolute path commit phase. -Steve rb On Mon, Feb 20, 2017 at 10:14 AM, Matthew Schauer <matthew.scha...@ibm.com<mailto:matthew.scha...@ibm.com>> wrote: I'm using Spark 1.5.2 and trying to append a data frame to partitioned Parquet directory in S3. It is known that the default `ParquetOutputCommitter` performs poorly in S3 because move is implemented as copy/delete, but the `DirectParquetOutputCommitter` is not safe to use for append operations in case of failure. I'm not very familiar with the intricacies of job/task committing/aborting, but I've written a rough replacement output committer that seems to work. It writes the results directly to their final locations and uses the write UUID to determine which files to remove in the case of a job/task abort. It seems to be a workable concept in the simple tests that I've tried. However, I can't make Spark use this alternate output committer because the changes in SPARK-8578 categorically prohibit any custom output committer from being used, even if it's safe for appending. I have two questions: 1) Does anyone more familiar with output committing have any feedback on my proposed "safe" append strategy, and 2) is there any way to circumvent the restriction on append committers without editing and recompiling Spark? Discussion of solutions in Spark 2.1 is also welcome. -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Output-Committers-for-S3-tp21033.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com<http://Nabble.com>. --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org<mailto:dev-unsubscr...@spark.apache.org> -- Ryan Blue Software Engineer Netflix