https://issues.apache.org/jira/browse/MAPREDUCE-7282
"MR v2 commit algorithm is dangerous, should be deprecated and not the default" someone do a PR to change the default & if it doesn't break too much I'l merge it On Mon, 29 Jun 2020 at 13:20, Steve Loughran <ste...@cloudera.com> wrote: > v2 does a file-by-file copy to the dest dir in task commit; v1 promotes > task attempts to job attempt dir by dir rename, job commit lists those and > moves the contents > > if the worker fails during task commit -the next task attempt has to > replace every file -so it had better use the same filenames. > > The really scary issue is a network partition: if the first worker went > off-line long enough for a second attempt to commit (If speculation has > enabled that may not be very long at all as could already be waiting) then > if the second worker goes online again it may continue with its commit and > partially overwrite some but not all of the output. > > That task commit is not atomic even though spark requires this. It is > worse on Amazon S3 because rename is O(data). The window for failure is a > lot longer. > > The S3A committers don't commit their work until job commit; while that is > non-atomic (nor is MR v1 BTW) it's time is |files|/(min(|threads|, > max-http-pool-size)) > > The EMR spark committer does actually commit its work in task commit, so > is also vulnerable. I wish they copied more of our ASF-licensed code :). Or > some of IBM's stocator work. > > > Presumably their algorithm is > > pre-task-reporting ready-to-commit: upload files from the localfd task > attempt staging dir to dest dir, without completing the upload. You could > actually do this with a scanning thread uploading as you go along. > task commit: POST all the uploads > job commit: touch _SUCCESS > > The scales better (no need to load & commit uploads in job commit) and > does not require any consistent cluster FS. And is faster. > > But again: the failure semantic of task commit isn't what spark expects. > > Bonus fun: google GCS dir commit is file-by-file so non atomic; v1 task > commit does expect an atomic dir rename. So you may as well use v2. > > They could add a committer which didn't do that rename, just write a > manifest file to the job attempt dir pointing to the successful task > attempt; commit that with their atomic file rename. The committer plugin > point in MR lets you declare a committer factory for each FS, so it could > be done without any further changes to spark. > > On Thu, 25 Jun 2020 at 22:38, Waleed Fateem <waleed.fat...@gmail.com> > wrote: > >> I was trying to make my email short and concise, but the rationale behind >> setting that as 1 by default is because it's safer. With algorithm version >> 2 you run the risk of having bad data in cases where tasks fail or even >> duplicate data if a task fails and succeeds on a reattempt (I don't know if >> this is true for all OutputCommitters that extend the FileOutputCommitter >> or not). >> >> Imran and Marcelo also discussed this here: >> >> https://issues.apache.org/jira/browse/SPARK-20107?focusedCommentId=15945177&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15945177 >> >> I also did discuss this a bit with Steve Loughran and his opinion was >> that v2 should just be deprecated all together. I believe he was going to >> bring that up with the Hadoop developers. >> >> >> On Thu, Jun 25, 2020 at 3:56 PM Sean Owen <sro...@gmail.com> wrote: >> >>> I think is a Hadoop property that is just passed through? if the >>> default is different in Hadoop 3 we could mention that in the docs. i >>> don't know if we want to always set it to 1 as a Spark default, even >>> in Hadoop 3 right? >>> >>> On Thu, Jun 25, 2020 at 2:43 PM Waleed Fateem <waleed.fat...@gmail.com> >>> wrote: >>> > >>> > Hello! >>> > >>> > I noticed that in the documentation starting with 2.2.0 it states that >>> the parameter spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version >>> is 1 by default: >>> > https://issues.apache.org/jira/browse/SPARK-20107 >>> > >>> > I don't actually see this being set anywhere explicitly in the Spark >>> code and so the documentation isn't entirely accurate in case you run on an >>> environment that has MAPREDUCE-6406 implemented (starting with Hadoop 3.0). >>> > >>> > The default version was explicitly set to 2 in the FileOutputCommitter >>> class, so any output committer that inherits from this class >>> (ParquetOutputCommitter for example) would use v2 in a Hadoop 3.0 >>> environment and v1 in the older Hadoop environments. >>> > >>> > Would it make sense for us to consider setting v1 as the default in >>> code in case the configuration was not set by a user? >>> > >>> > Regards, >>> > >>> > Waleed >>> >>