Very interesting. I will give it a try. Thanks for pointing this.
Also, are you planning to contribute it to spark, and could it be a good
default option for spark S3 copies ?
Have you got any bench marking that could show the improvements in the job.

Thanks,
Yash

On Sat, 8 Apr 2017 at 02:38 Ryan Blue <rb...@netflix.com> wrote:

> Yash,
>
> We (Netflix) built a committer that uses the S3 multipart upload API to
> avoid the copy problem and still handle task failures. You can build and
> use the copy posted here:
>
>   https://github.com/rdblue/s3committer
>
> You're probably interested in the S3PartitionedOutputCommitter.
>
> rb
>
> On Thu, Apr 6, 2017 at 10:08 PM, Yash Sharma <yash...@gmail.com> wrote:
>
> Hi All,
> This is another issue that I was facing with the spark - s3 operability
> and wanted to ask to the broader community if its faced by anyone else.
>
> I have a rather simple aggregation query with a basic transformation. The
> output however has lot of output partitions (20K partitions). The spark job
> runs very fast and reaches the end without any failures. So far the spark
> job has been writing to the staging dir and runs alright.
>
> As soon as spark starts renaming these files it faces 2 issues:
> 1. s3 single path renames are insanely slow : and the job spends huge time
> renaming these files
> 2. Sometimes renames fail : spark probably has checks after writing the
> file (not sure) and sometimes few renames fail randomly because of s3's
> eventual consistency, causing the job to fail intermittently. [added logs
> at end]
>
> I was wondering what could be some work arounds for this problem or is it
> possible to override this behavior and write files directly to the expected
> paths (skipping the staging dir _temporary).
>
> Cheers,
> Yash
>
> {logs}
> java.io.IOException: Failed to rename
> FileStatus{path=s3://instances/_temporary/0/task_201704060437_0005_m_000052/utc_date=2012-06-19/product=obsolete;
> isDirectory=true; modification_time=0; access_time=0; owner=; group=;
> permission=rwxrwxrwx; isSymlink=false} to
> s3://instances/utc_date=2012-06-19/product=obsolete
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.renameOrMerge(FileOutputCommitter.java:441)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:432)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
> ...
> ...
> InsertIntoHadoopFsRelationCommand.scala:115)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:115)
> ...
> ...
> at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:627)
> 17/04/06 04:41:54 ERROR DynamicPartitionWriterContainer: Job
> job_201704060436_0000 aborted.
> 17/04/06 04:41:54 ERROR ActiveInstances$: Exception in running
> ActiveInstances.
> org.apache.spark.SparkException: Job aborted.
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:149)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
>
> {logs}
>
>
>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Reply via email to