[
https://issues.apache.org/jira/browse/SPARK-10063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16349550#comment-16349550
]
Henrique dos Santos Goulart edited comment on SPARK-10063 at 2/2/18 12:11 AM:
------------------------------------------------------------------------------
There is any alternative right now that works with Parquet that uses
partitionBy? Because it works very well if I set version=2 and do not use
paritionBy parquet, but if I use
dataset.partitionBy(..).option(..algorithm.version", "2").parquet(...) it will
create temporary folders =(
Reference question:
[https://stackoverflow.com/questions/48573643/spark-dataset-parquet-partition-on-s3-creating-temporary-folder]
[~rxin] [~yhuai] [[email protected]] [~chiragvaya]
was (Author: henriquedsg89):
There is any alternative right now that works with Parquet that uses
partitionBy? Because if I use
dataset.partitionBy(..).option(..algorithm.version", "2").parquet(...) it will
create temporary folders =(
Reference question:
https://stackoverflow.com/questions/48573643/spark-dataset-parquet-partition-on-s3-creating-temporary-folder
> Remove DirectParquetOutputCommitter
> -----------------------------------
>
> Key: SPARK-10063
> URL: https://issues.apache.org/jira/browse/SPARK-10063
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Reporter: Yin Huai
> Assignee: Reynold Xin
> Priority: Critical
> Fix For: 2.0.0
>
>
> When we use DirectParquetOutputCommitter on S3 and speculation is enabled,
> there is a chance that we can loss data.
> Here is the code to reproduce the problem.
> {code}
> import org.apache.spark.sql.functions._
> val failSpeculativeTask = sqlContext.udf.register("failSpeculativeTask", (i:
> Int, partitionId: Int, attemptNumber: Int) => {
> if (partitionId == 0 && i == 5) {
> if (attemptNumber > 0) {
> Thread.sleep(15000)
> throw new Exception("new exception")
> } else {
> Thread.sleep(10000)
> }
> }
>
> i
> })
> val df = sc.parallelize((1 to 100), 20).mapPartitions { iter =>
> val context = org.apache.spark.TaskContext.get()
> val partitionId = context.partitionId
> val attemptNumber = context.attemptNumber
> iter.map(i => (i, partitionId, attemptNumber))
> }.toDF("i", "partitionId", "attemptNumber")
> df
> .select(failSpeculativeTask($"i", $"partitionId",
> $"attemptNumber").as("i"), $"partitionId", $"attemptNumber")
> .write.mode("overwrite").format("parquet").save("/home/yin/outputCommitter")
> sqlContext.read.load("/home/yin/outputCommitter").count
> // The result is 99 and 5 is missing from the output.
> {code}
> What happened is that the original task finishes first and uploads its output
> file to S3, then the speculative task somehow fails. Because we have to call
> output stream's close method, which uploads data to S3, we actually uploads
> the partial result generated by the failed speculative task to S3 and this
> file overwrites the correct file generated by the original task.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]