[ 
https://issues.apache.org/jira/browse/SPARK-8406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cheng Lian updated SPARK-8406:
------------------------------
    Description: 
To support appending, the Parquet data source tries to find out the max part 
number of part-files in the destination directory (the <id> in output file name 
"part-r-<id>.gz.parquet") at the beginning of the write job. In 1.3.0, this 
step happens on driver side before any files are written. However, in 1.4.0, 
this is moved to task side. Thus, for tasks scheduled later, they may see wrong 
max part number generated by newly written files by other finished tasks within 
the same job. This actually causes a race condition. In most cases, this only 
causes nonconsecutive IDs in output file names. But when the DataFrame contains 
thousands of RDD partitions, it's likely that two tasks may choose the same 
part number, thus one of them gets overwritten by the other.

The data loss situation is not quite easy to reproduce. But the following Spark 
shell snippet can reproduce nonconsecutive output file IDs:
{code}
sqlContext.range(0, 128).repartition(16).write.mode("overwrite").parquet("foo")
{code}
"16" can be replaced with any integer that is greater than the default 
parallelism on your machine (usually it means core number, on my machine it's 
8).
{noformat}
-rw-r--r--   3 lian supergroup          0 2015-06-17 00:06 
/user/lian/foo/_SUCCESS
-rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
/user/lian/foo/part-r-00001.gz.parquet
-rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
/user/lian/foo/part-r-00002.gz.parquet
-rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
/user/lian/foo/part-r-00003.gz.parquet
-rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
/user/lian/foo/part-r-00004.gz.parquet
-rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
/user/lian/foo/part-r-00005.gz.parquet
-rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
/user/lian/foo/part-r-00006.gz.parquet
-rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
/user/lian/foo/part-r-00007.gz.parquet
-rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
/user/lian/foo/part-r-00008.gz.parquet
-rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
/user/lian/foo/part-r-00017.gz.parquet
-rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
/user/lian/foo/part-r-00018.gz.parquet
-rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
/user/lian/foo/part-r-00019.gz.parquet
-rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
/user/lian/foo/part-r-00020.gz.parquet
-rw-r--r--   3 lian supergroup        352 2015-06-17 00:06 
/user/lian/foo/part-r-00021.gz.parquet
-rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
/user/lian/foo/part-r-00022.gz.parquet
-rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
/user/lian/foo/part-r-00023.gz.parquet
-rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
/user/lian/foo/part-r-00024.gz.parquet
{noformat}
Notice that the newly added ORC data source doesn't suffer this issue because 
it uses both part number and {{System.currentTimeMills()}} to generate the 
output file name.

  was:
To support appending, the Parquet data source tries to find out the max ID of 
part-files in the destination directory (the <id> in output file name 
"part-r-<id>.gz.parquet") at the beginning of the write job. In 1.3.0, this 
step happens on driver side before any files are written. However, in 1.4.0, 
this is moved to task side. Thus, for tasks scheduled later, they may see wrong 
max ID generated by newly written files by other finished tasks within the same 
job. This actually causes a race condition. In most cases, this only causes 
nonconsecutive IDs in output file names. But when the DataFrame contains 
thousands of RDD partitions, it's likely that two tasks may choose the same ID, 
thus one of them gets overwritten by the other.

The data loss situation is not quite easy to reproduce. But the following Spark 
shell snippet can reproduce nonconsecutive output file IDs:
{code}
sqlContext.range(0, 128).repartition(16).write.mode("overwrite").parquet("foo")
{code}
"16" can be replaced with any integer that is greater than the default 
parallelism on your machine (usually it means core number, on my machine it's 
8).
{noformat}
-rw-r--r--   3 lian supergroup          0 2015-06-17 00:06 
/user/lian/foo/_SUCCESS
-rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
/user/lian/foo/part-r-00001.gz.parquet
-rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
/user/lian/foo/part-r-00002.gz.parquet
-rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
/user/lian/foo/part-r-00003.gz.parquet
-rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
/user/lian/foo/part-r-00004.gz.parquet
-rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
/user/lian/foo/part-r-00005.gz.parquet
-rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
/user/lian/foo/part-r-00006.gz.parquet
-rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
/user/lian/foo/part-r-00007.gz.parquet
-rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
/user/lian/foo/part-r-00008.gz.parquet
-rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
/user/lian/foo/part-r-00017.gz.parquet
-rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
/user/lian/foo/part-r-00018.gz.parquet
-rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
/user/lian/foo/part-r-00019.gz.parquet
-rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
/user/lian/foo/part-r-00020.gz.parquet
-rw-r--r--   3 lian supergroup        352 2015-06-17 00:06 
/user/lian/foo/part-r-00021.gz.parquet
-rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
/user/lian/foo/part-r-00022.gz.parquet
-rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
/user/lian/foo/part-r-00023.gz.parquet
-rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
/user/lian/foo/part-r-00024.gz.parquet
{noformat}
Notice that the newly added ORC data source doesn't suffer this issue because 
it uses both task ID and {{System.currentTimeMills()}} to generate the output 
file name.


> Race condition when writing Parquet files
> -----------------------------------------
>
>                 Key: SPARK-8406
>                 URL: https://issues.apache.org/jira/browse/SPARK-8406
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.4.0
>            Reporter: Cheng Lian
>            Assignee: Cheng Lian
>            Priority: Blocker
>
> To support appending, the Parquet data source tries to find out the max part 
> number of part-files in the destination directory (the <id> in output file 
> name "part-r-<id>.gz.parquet") at the beginning of the write job. In 1.3.0, 
> this step happens on driver side before any files are written. However, in 
> 1.4.0, this is moved to task side. Thus, for tasks scheduled later, they may 
> see wrong max part number generated by newly written files by other finished 
> tasks within the same job. This actually causes a race condition. In most 
> cases, this only causes nonconsecutive IDs in output file names. But when the 
> DataFrame contains thousands of RDD partitions, it's likely that two tasks 
> may choose the same part number, thus one of them gets overwritten by the 
> other.
> The data loss situation is not quite easy to reproduce. But the following 
> Spark shell snippet can reproduce nonconsecutive output file IDs:
> {code}
> sqlContext.range(0, 
> 128).repartition(16).write.mode("overwrite").parquet("foo")
> {code}
> "16" can be replaced with any integer that is greater than the default 
> parallelism on your machine (usually it means core number, on my machine it's 
> 8).
> {noformat}
> -rw-r--r--   3 lian supergroup          0 2015-06-17 00:06 
> /user/lian/foo/_SUCCESS
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
> /user/lian/foo/part-r-00001.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
> /user/lian/foo/part-r-00002.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
> /user/lian/foo/part-r-00003.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
> /user/lian/foo/part-r-00004.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
> /user/lian/foo/part-r-00005.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
> /user/lian/foo/part-r-00006.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
> /user/lian/foo/part-r-00007.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
> /user/lian/foo/part-r-00008.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
> /user/lian/foo/part-r-00017.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
> /user/lian/foo/part-r-00018.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
> /user/lian/foo/part-r-00019.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
> /user/lian/foo/part-r-00020.gz.parquet
> -rw-r--r--   3 lian supergroup        352 2015-06-17 00:06 
> /user/lian/foo/part-r-00021.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
> /user/lian/foo/part-r-00022.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
> /user/lian/foo/part-r-00023.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
> /user/lian/foo/part-r-00024.gz.parquet
> {noformat}
> Notice that the newly added ORC data source doesn't suffer this issue because 
> it uses both part number and {{System.currentTimeMills()}} to generate the 
> output file name.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to