[
https://issues.apache.org/jira/browse/SPARK-7837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14628467#comment-14628467
]
Yin Huai edited comment on SPARK-7837 at 7/15/15 6:36 PM:
----------------------------------------------------------
[~mkanchwala] There is a bug
(https://issues.apache.org/jira/browse/SPARK-8406), which potentially may cause
data loss for a large job. Please use Spark 1.4.1 as soon as it is released (I
think today or tomorrow) or manually apply the fix
(https://github.com/nemccarthy/spark/commit/ba365909b964fe5a5851d88f5f7b7edcd1998142)
to your Spark 1.4.0 source code.
Regarding the slowness of saving parquet files in S3, a possible cause is that
Parquet's original output committer
({{org.apache.parquet.hadoop.ParquetOutputCommitter}}) will first write data in
the temporary dir and then move them to the right place when it commits tasks.
This behavior is not necessary in most of the cases for S3 because "S3 supports
multiple writers outputting to the same file, where visibility is guaranteed to
be atomic" (https://gist.github.com/aarondav/c513916e72101bbe14ec). Once you
upgrade to Spark 1.4.1, you can set
{{spark.sql.parquet.output.committer.class}} to
{{org.apache.spark.sql.parquet.DirectParquetOutputCommitter}} in your hadoop
conf, which will write output files directly to their final locations. The only
case that is not safe to use DirectParquetOutputCommitter is when you append
data to an existing table. In this case, Spark 1.4.1 will internally switch
back to the original Parquet output committer.
was (Author: yhuai):
[~mkanchwala] There is a bug
(https://issues.apache.org/jira/browse/SPARK-8406), which potentially may cause
data loss for a large job. Please use Spark 1.4.1 as soon as it is released (I
think today or tomorrow) or manually apply the fix
(https://github.com/nemccarthy/spark/commit/ba365909b964fe5a5851d88f5f7b7edcd1998142)
to your Spark 1.4.0 source code.
Regarding the slowness of saving parquet files in S3, a possible cause is that
Parquet's original output committer
({{org.apache.parquet.hadoop.ParquetOutputCommitter}}) will first write data in
the temporary dir and then move them to the right place when it commits tasks.
This behavior is not necessary in most of the cases for S3 because "S3 supports
multiple writers outputting to the same file, where visibility is guaranteed to
be atomic" (https://gist.github.com/aarondav/c513916e72101bbe14ec). Once you
upgrade to Spark 1.4.1, you can set
{{spark.sql.parquet.output.committer.class}} to
{{org.apache.spark.sql.parquet.DirectParquetOutputCommitter}}, which will write
output files directly to their final locations. The only case that is not safe
to use DirectParquetOutputCommitter is when you append data to an existing
table. In this case, Spark 1.4.1 will internally switch back to the original
Parquet output committer.
> NPE when save as parquet in speculative tasks
> ---------------------------------------------
>
> Key: SPARK-7837
> URL: https://issues.apache.org/jira/browse/SPARK-7837
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.4.0
> Reporter: Yin Huai
> Priority: Critical
>
> The query is like {{df.orderBy(...).saveAsTable(...)}}.
> When there is no partitioning columns and there is a skewed key, I found the
> following exception in speculative tasks. After these failures, seems we
> could not call {{SparkHadoopMapRedUtil.commitTask}} correctly.
> {code}
> java.lang.NullPointerException
> at
> parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:146)
> at
> parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:112)
> at parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73)
> at
> org.apache.spark.sql.parquet.ParquetOutputWriter.close(newParquet.scala:115)
> at
> org.apache.spark.sql.sources.DefaultWriterContainer.abortTask(commands.scala:385)
> at
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:150)
> at
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:122)
> at
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:122)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]