sorry, not noticed this followup. Been busy with other issues On 3 Apr 2018, at 11:19, cane <zhoukang199...@gmail.com<mailto:zhoukang199...@gmail.com>> wrote:
Now, if we use saveAsNewAPIHadoopDataset with speculation enable.It may cause data loss. I check the comment of thi api: We should make sure our tasks are idempotent when speculation is enabled, i.e. do * not use output committer that writes data directly. * There is an example in https://issues.apache.org/jira/browse/SPARK-10063 to show the bad * result of using direct output committer with speculation enabled. */ But if this the rule we must follow? For example,for parquet it will got ParquetOutPutCommitter. In this case, speculation must disable for parquet? ParquetOutputCommitter is a subclass of Hadoop's FileOutputCommitter, so you get the choice of its two algorithms, as set by spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version algorithm 1 : - tasks write to _temporary/$jobId/_temporary/$taskId directory, - task commit to _temporary/$jobId$taskId in what for a real FS is an O(1) atomic operation. ; speculation and retry straightforward. -job commit: copy the contents of all the task ID directories to the destination, create _SUCCESS file job commit is non-atomic, If a job fails during commit you need to delete the dest dir and try again. alogirthm2: : - tasks write to _temporary/$jobId/_temporary/$taskId directory, -task commit: merge to dest directory, potentially while other tasks are doing a merge at the same time. -Job commit does nothing but create the _SUCCESS file, and can be repeated. you can speculate with either, but if a task using algorithm 2 fails during task commit then there's a problem, as the store is in an unknown state. Neither MapReduce nor Spark worry about this. Usually its fast so the window of failure pretty small, when you are working with object stores that doesn't hold. Really they should react to that failure by aborting the job, but as object stores tend to have their own issues, this is more of a detail than the underlying flaw. As I said, you can read https://github.com/steveloughran/zero-rename-committer/releases/download/tag_draft_003/a_zero_rename_committer.pdf and a precursor attempt to document what goes in the depths of FileOutpuitCommitter (which has an error in one of the code samples; I forget which. The paper fixes that) http://hadoop.apache.org/docs/r3.1.0/hadoop-aws/tools/hadoop-aws/committer_architecture.html + an IBM paper on their Swift committer for spark: Stocator: A High Performance Object Store Connector for Spark: https://arxiv.org/pdf/1709.01812 I have some issues with that paper, but its worthwhile looking at to see their focus on rollback over temp directories http://steveloughran.blogspot.co.uk/2017/09/stocator-high-performance-object-store.html Is there some one know the history? If you check out hadoop, you can get the history after the svn -> git migration, though the earlier history is lost in folklore, primarily stories of "what went wrong" at Yahoo!. https://github.com/apache/hadoop/commits/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java https://github.com/apache/hadoop/commits/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java For spark, look at https://issues.apache.org/jira/browse/SPARK-4879 and the git logs of core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala Once you start looking at the commit protocols, you end up in a fascinating world where things like proof of correctness start to matter. Sadly, everyone is constrained not just but our lack of everyday use of the language and tools, but by the lack of a foundation of specs of the underlying storage systems. There is one for a model of a consistent s3 store, https://issues.apache.org/jira/secure/attachment/12865161/objectstore.pdf , but I couldn't work out how to define an eventually consistent one in TLA+.. Contributions welcome. -Steve (*) the Hadoop Filesystem spec is actually Z disguised as Python, but it doesn't integrate with any toolchain you can use for correctness proofs. But it is read, understood and maintained by developers, which I consider a success. It's just, we could do more.