sorry, not noticed this followup. Been busy with other issues

On 3 Apr 2018, at 11:19, cane 
<zhoukang199...@gmail.com<mailto:zhoukang199...@gmail.com>> wrote:

Now, if we use saveAsNewAPIHadoopDataset with speculation enable.It may cause
data loss.
I check the comment of thi api:

 We should make sure our tasks are idempotent when speculation is enabled,
i.e. do
  * not use output committer that writes data directly.
  * There is an example in
https://issues.apache.org/jira/browse/SPARK-10063 to show the bad
  * result of using direct output committer with speculation enabled.
  */

But if this the rule we must follow?
For example,for parquet it will got ParquetOutPutCommitter.
In this case, speculation must disable for parquet?

ParquetOutputCommitter is a subclass of Hadoop's FileOutputCommitter, so you 
get the choice of its two algorithms, as set by 
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version


algorithm 1 :
- tasks write to _temporary/$jobId/_temporary/$taskId directory,
- task commit to _temporary/$jobId$taskId in what for a real FS is an O(1) 
atomic operation. ; speculation and retry straightforward.
 -job commit: copy the contents of all the task ID directories to the 
destination, create _SUCCESS file
 job commit is non-atomic, If a job fails during commit you need to delete the 
dest dir and try again.

alogirthm2: :
- tasks write to _temporary/$jobId/_temporary/$taskId directory,
 -task commit: merge to dest directory, potentially while other tasks are doing 
a merge at the same time.
 -Job commit does nothing but create the _SUCCESS file, and can be repeated.

you can speculate with either, but if a task using algorithm 2 fails during 
task commit then there's a problem, as the store is in an unknown state. 
Neither MapReduce nor Spark worry about this. Usually its fast so the window of 
failure pretty small, when you are working with object stores that doesn't 
hold. Really they should react to that failure by aborting the job, but as 
object stores tend to have their own issues, this is more of a detail than the 
underlying flaw.

As I said, you can read

https://github.com/steveloughran/zero-rename-committer/releases/download/tag_draft_003/a_zero_rename_committer.pdf

and a precursor attempt to document what goes in the depths of 
FileOutpuitCommitter (which has an error in one of the code samples; I forget 
which. The paper fixes that)

http://hadoop.apache.org/docs/r3.1.0/hadoop-aws/tools/hadoop-aws/committer_architecture.html

+ an IBM paper on their Swift committer for spark:  Stocator: A High 
Performance Object Store Connector for Spark: https://arxiv.org/pdf/1709.01812


I have some issues with that paper, but its worthwhile looking at to see their 
focus on rollback over temp directories
http://steveloughran.blogspot.co.uk/2017/09/stocator-high-performance-object-store.html



Is there some one know the history?

If you check out hadoop, you can get the history after the svn -> git 
migration, though the earlier history is lost in folklore, primarily stories of 
"what went wrong" at Yahoo!.

https://github.com/apache/hadoop/commits/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java
https://github.com/apache/hadoop/commits/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java

For spark, look at
https://issues.apache.org/jira/browse/SPARK-4879

and the git logs of

core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala

Once you start looking at the commit protocols, you end up in a fascinating 
world where things like proof of correctness start to matter. Sadly, everyone 
is constrained not just but our lack of everyday use of the language and tools, 
but by the lack of a foundation of specs of the underlying storage systems. 
There is one for a model of a consistent s3 store, 
https://issues.apache.org/jira/secure/attachment/12865161/objectstore.pdf , but 
I couldn't work out how to define an eventually consistent one in TLA+.. 
Contributions welcome.

-Steve

(*) the Hadoop Filesystem spec is actually Z disguised as Python, but it 
doesn't integrate with any toolchain you can use for correctness proofs. But it 
is read, understood and maintained by developers, which I consider a success. 
It's just, we could do more.

Reply via email to