[ 
https://issues.apache.org/jira/browse/SPARK-7837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14628467#comment-14628467
 ] 

Yin Huai edited comment on SPARK-7837 at 7/15/15 6:36 PM:
----------------------------------------------------------

[~mkanchwala] There is a bug 
(https://issues.apache.org/jira/browse/SPARK-8406), which potentially may cause 
data loss for a large job. Please use Spark 1.4.1 as soon as it is released (I 
think today or tomorrow) or manually apply the fix 
(https://github.com/nemccarthy/spark/commit/ba365909b964fe5a5851d88f5f7b7edcd1998142)
 to your Spark 1.4.0 source code.

Regarding the slowness of saving parquet files in S3, a possible cause is that 
Parquet's original output committer 
({{org.apache.parquet.hadoop.ParquetOutputCommitter}}) will first write data in 
the temporary dir and then move them to the right place when it commits tasks. 
This behavior is not necessary in most of the cases for S3 because "S3 supports 
multiple writers outputting to the same file, where visibility is guaranteed to 
be atomic" (https://gist.github.com/aarondav/c513916e72101bbe14ec). Once you 
upgrade to Spark 1.4.1, you can set 
{{spark.sql.parquet.output.committer.class}} to 
{{org.apache.spark.sql.parquet.DirectParquetOutputCommitter}} in your hadoop 
conf, which will write output files directly to their final locations. The only 
case that is not safe to use DirectParquetOutputCommitter is when you append 
data to an existing table. In this case, Spark 1.4.1 will internally switch 
back to the original Parquet output committer.


was (Author: yhuai):
[~mkanchwala] There is a bug 
(https://issues.apache.org/jira/browse/SPARK-8406), which potentially may cause 
data loss for a large job. Please use Spark 1.4.1 as soon as it is released (I 
think today or tomorrow) or manually apply the fix 
(https://github.com/nemccarthy/spark/commit/ba365909b964fe5a5851d88f5f7b7edcd1998142)
 to your Spark 1.4.0 source code.

Regarding the slowness of saving parquet files in S3, a possible cause is that 
Parquet's original output committer 
({{org.apache.parquet.hadoop.ParquetOutputCommitter}}) will first write data in 
the temporary dir and then move them to the right place when it commits tasks. 
This behavior is not necessary in most of the cases for S3 because "S3 supports 
multiple writers outputting to the same file, where visibility is guaranteed to 
be atomic" (https://gist.github.com/aarondav/c513916e72101bbe14ec). Once you 
upgrade to Spark 1.4.1, you can set 
{{spark.sql.parquet.output.committer.class}} to 
{{org.apache.spark.sql.parquet.DirectParquetOutputCommitter}}, which will write 
output files directly to their final locations. The only case that is not safe 
to use DirectParquetOutputCommitter is when you append data to an existing 
table. In this case, Spark 1.4.1 will internally switch back to the original 
Parquet output committer.

> NPE when save as parquet in speculative tasks
> ---------------------------------------------
>
>                 Key: SPARK-7837
>                 URL: https://issues.apache.org/jira/browse/SPARK-7837
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.4.0
>            Reporter: Yin Huai
>            Priority: Critical
>
> The query is like {{df.orderBy(...).saveAsTable(...)}}.
> When there is no partitioning columns and there is a skewed key, I found the 
> following exception in speculative tasks. After these failures, seems we 
> could not call {{SparkHadoopMapRedUtil.commitTask}} correctly.
> {code}
> java.lang.NullPointerException
>       at 
> parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:146)
>       at 
> parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:112)
>       at parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73)
>       at 
> org.apache.spark.sql.parquet.ParquetOutputWriter.close(newParquet.scala:115)
>       at 
> org.apache.spark.sql.sources.DefaultWriterContainer.abortTask(commands.scala:385)
>       at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:150)
>       at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:122)
>       at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:122)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>       at org.apache.spark.scheduler.Task.run(Task.scala:70)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to