https://issues.apache.org/jira/browse/MAPREDUCE-7282

"MR v2 commit algorithm is dangerous, should be deprecated and not the
default"

someone do a PR to change the default & if it doesn't break too much I'l
merge it



On Mon, 29 Jun 2020 at 13:20, Steve Loughran <ste...@cloudera.com> wrote:

> v2 does a file-by-file copy to the dest dir in task commit; v1 promotes
> task attempts to job attempt dir by dir rename, job commit lists those and
> moves the contents
>
> if the worker fails during task commit -the next task attempt has to
> replace every file -so it had better use the same filenames.
>
> The really scary issue is a network partition: if the first worker went
> off-line long enough for a second attempt to commit (If speculation has
> enabled that may not be very long at all as could already be waiting) then
> if the second worker goes online again it may continue with its commit and
> partially overwrite some but not all of the output.
>
> That task commit is not atomic even though spark requires this. It is
> worse on Amazon S3 because rename is O(data). The window for failure is a
> lot longer.
>
> The S3A committers don't commit their work until job commit; while that is
> non-atomic (nor is MR v1 BTW) it's time is |files|/(min(|threads|,
> max-http-pool-size))
>
> The EMR spark committer does actually commit its work in task commit, so
> is also vulnerable. I wish they copied more of our ASF-licensed code :). Or
> some of IBM's stocator work.
>
>
> Presumably their algorithm is
>
> pre-task-reporting ready-to-commit: upload files from the localfd task
> attempt staging dir to dest dir, without completing the upload. You could
> actually do this with a scanning thread uploading as you go along.
> task commit: POST all the uploads
> job commit: touch _SUCCESS
>
> The scales better (no need to load & commit uploads in job commit) and
> does not require any consistent cluster FS. And is faster.
>
> But again: the failure semantic of task commit isn't what spark expects.
>
> Bonus fun: google GCS dir commit is file-by-file so non atomic; v1 task
> commit does expect an atomic dir rename. So you may as well use v2.
>
> They could add a committer which didn't do that rename, just write a
> manifest file to the job attempt dir pointing to the successful task
> attempt; commit that with their atomic file rename. The committer plugin
> point in MR lets you declare a committer factory for each FS, so it could
> be done without any further changes to spark.
>
> On Thu, 25 Jun 2020 at 22:38, Waleed Fateem <waleed.fat...@gmail.com>
> wrote:
>
>> I was trying to make my email short and concise, but the rationale behind
>> setting that as 1 by default is because it's safer. With algorithm version
>> 2 you run the risk of having bad data in cases where tasks fail or even
>> duplicate data if a task fails and succeeds on a reattempt (I don't know if
>> this is true for all OutputCommitters that extend the FileOutputCommitter
>> or not).
>>
>> Imran and Marcelo also discussed this here:
>>
>> https://issues.apache.org/jira/browse/SPARK-20107?focusedCommentId=15945177&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15945177
>>
>> I also did discuss this a bit with Steve Loughran and his opinion was
>> that v2 should just be deprecated all together. I believe he was going to
>> bring that up with the Hadoop developers.
>>
>>
>> On Thu, Jun 25, 2020 at 3:56 PM Sean Owen <sro...@gmail.com> wrote:
>>
>>> I think is a Hadoop property that is just passed through? if the
>>> default is different in Hadoop 3 we could mention that in the docs. i
>>> don't know if we want to always set it to 1 as a Spark default, even
>>> in Hadoop 3 right?
>>>
>>> On Thu, Jun 25, 2020 at 2:43 PM Waleed Fateem <waleed.fat...@gmail.com>
>>> wrote:
>>> >
>>> > Hello!
>>> >
>>> > I noticed that in the documentation starting with 2.2.0 it states that
>>> the parameter spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version
>>> is 1 by default:
>>> > https://issues.apache.org/jira/browse/SPARK-20107
>>> >
>>> > I don't actually see this being set anywhere explicitly in the Spark
>>> code and so the documentation isn't entirely accurate in case you run on an
>>> environment that has MAPREDUCE-6406 implemented (starting with Hadoop 3.0).
>>> >
>>> > The default version was explicitly set to 2 in the FileOutputCommitter
>>> class, so any output committer that inherits from this class
>>> (ParquetOutputCommitter for example) would use v2 in a Hadoop 3.0
>>> environment and v1 in the older Hadoop environments.
>>> >
>>> > Would it make sense for us to consider setting v1 as the default in
>>> code in case the configuration was not set by a user?
>>> >
>>> > Regards,
>>> >
>>> > Waleed
>>>
>>

Reply via email to