[
https://issues.apache.org/jira/browse/SPARK-8406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Cheng Lian updated SPARK-8406:
------------------------------
Description:
To support appending, the Parquet data source tries to find out the max part
number of part-files in the destination directory (the <id> in output file name
"part-r-<id>.gz.parquet") at the beginning of the write job. In 1.3.0, this
step happens on driver side before any files are written. However, in 1.4.0,
this is moved to task side. Thus, for tasks scheduled later, they may see wrong
max part number generated by newly written files by other finished tasks within
the same job. This actually causes a race condition. In most cases, this only
causes nonconsecutive IDs in output file names. But when the DataFrame contains
thousands of RDD partitions, it's likely that two tasks may choose the same
part number, thus one of them gets overwritten by the other.
The following Spark shell snippet can reproduce nonconsecutive part numbers:
{code}
sqlContext.range(0, 128).repartition(16).write.mode("overwrite").parquet("foo")
{code}
"16" can be replaced with any integer that is greater than the default
parallelism on your machine (usually it means core number, on my machine it's
8).
{noformat}
-rw-r--r-- 3 lian supergroup 0 2015-06-17 00:06
/user/lian/foo/_SUCCESS
-rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
/user/lian/foo/part-r-00001.gz.parquet
-rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
/user/lian/foo/part-r-00002.gz.parquet
-rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
/user/lian/foo/part-r-00003.gz.parquet
-rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
/user/lian/foo/part-r-00004.gz.parquet
-rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
/user/lian/foo/part-r-00005.gz.parquet
-rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
/user/lian/foo/part-r-00006.gz.parquet
-rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
/user/lian/foo/part-r-00007.gz.parquet
-rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
/user/lian/foo/part-r-00008.gz.parquet
-rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
/user/lian/foo/part-r-00017.gz.parquet
-rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
/user/lian/foo/part-r-00018.gz.parquet
-rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
/user/lian/foo/part-r-00019.gz.parquet
-rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
/user/lian/foo/part-r-00020.gz.parquet
-rw-r--r-- 3 lian supergroup 352 2015-06-17 00:06
/user/lian/foo/part-r-00021.gz.parquet
-rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
/user/lian/foo/part-r-00022.gz.parquet
-rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
/user/lian/foo/part-r-00023.gz.parquet
-rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
/user/lian/foo/part-r-00024.gz.parquet
{noformat}
And here is another Spark shell snippet for reproducing overwriting:
{code}
sqlContext.range(0,
10000).repartition(500).write.mode("overwrite").parquet("foo")
sqlContext.read.parquet("foo").count()
{code}
Expected answer should be {{10000}}, but you may see a number like {{9960}} due
to overwriting. The actual number varies for different runs and different nodes.
Notice that the newly added ORC data source is less likely to hit this issue
because it uses task ID and {{System.currentTimeMills()}} to generate the
output file name. Thus, the ORC data source may hit this issue only when two
tasks with the same task ID (which means they are in two concurrent jobs) are
writing to the same location within the same millisecond.
was:
To support appending, the Parquet data source tries to find out the max part
number of part-files in the destination directory (the <id> in output file name
"part-r-<id>.gz.parquet") at the beginning of the write job. In 1.3.0, this
step happens on driver side before any files are written. However, in 1.4.0,
this is moved to task side. Thus, for tasks scheduled later, they may see wrong
max part number generated by newly written files by other finished tasks within
the same job. This actually causes a race condition. In most cases, this only
causes nonconsecutive IDs in output file names. But when the DataFrame contains
thousands of RDD partitions, it's likely that two tasks may choose the same
part number, thus one of them gets overwritten by the other.
The following Spark shell snippet can reproduce nonconsecutive part numbers:
{code}
sqlContext.range(0, 128).repartition(16).write.mode("overwrite").parquet("foo")
{code}
"16" can be replaced with any integer that is greater than the default
parallelism on your machine (usually it means core number, on my machine it's
8).
{noformat}
-rw-r--r-- 3 lian supergroup 0 2015-06-17 00:06
/user/lian/foo/_SUCCESS
-rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
/user/lian/foo/part-r-00001.gz.parquet
-rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
/user/lian/foo/part-r-00002.gz.parquet
-rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
/user/lian/foo/part-r-00003.gz.parquet
-rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
/user/lian/foo/part-r-00004.gz.parquet
-rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
/user/lian/foo/part-r-00005.gz.parquet
-rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
/user/lian/foo/part-r-00006.gz.parquet
-rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
/user/lian/foo/part-r-00007.gz.parquet
-rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
/user/lian/foo/part-r-00008.gz.parquet
-rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
/user/lian/foo/part-r-00017.gz.parquet
-rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
/user/lian/foo/part-r-00018.gz.parquet
-rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
/user/lian/foo/part-r-00019.gz.parquet
-rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
/user/lian/foo/part-r-00020.gz.parquet
-rw-r--r-- 3 lian supergroup 352 2015-06-17 00:06
/user/lian/foo/part-r-00021.gz.parquet
-rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
/user/lian/foo/part-r-00022.gz.parquet
-rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
/user/lian/foo/part-r-00023.gz.parquet
-rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
/user/lian/foo/part-r-00024.gz.parquet
{noformat}
And here is another Spark shell snippet for reproducing overwriting:
{code}
sqlContext.range(0,
10000).repartition(500).write.mode("overwrite").parquet("foo")
sqlContext.read.parquet("foo").count()
{code}
Expected answer should be {{10000}}, but you may see a number like {{9960}} due
to overwriting. The actual number varies for different run and different nodes.
Notice that the newly added ORC data source is less likely to hit this issue
because it uses task ID and {{System.currentTimeMills()}} to generate the
output file name. Thus, the ORC data source may hit this issue only when two
tasks with the same task ID (which means they are in two concurrent jobs) are
writing to the same location within the same millisecond.
> Race condition when writing Parquet files
> -----------------------------------------
>
> Key: SPARK-8406
> URL: https://issues.apache.org/jira/browse/SPARK-8406
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.4.0
> Reporter: Cheng Lian
> Assignee: Cheng Lian
> Priority: Blocker
>
> To support appending, the Parquet data source tries to find out the max part
> number of part-files in the destination directory (the <id> in output file
> name "part-r-<id>.gz.parquet") at the beginning of the write job. In 1.3.0,
> this step happens on driver side before any files are written. However, in
> 1.4.0, this is moved to task side. Thus, for tasks scheduled later, they may
> see wrong max part number generated by newly written files by other finished
> tasks within the same job. This actually causes a race condition. In most
> cases, this only causes nonconsecutive IDs in output file names. But when the
> DataFrame contains thousands of RDD partitions, it's likely that two tasks
> may choose the same part number, thus one of them gets overwritten by the
> other.
> The following Spark shell snippet can reproduce nonconsecutive part numbers:
> {code}
> sqlContext.range(0,
> 128).repartition(16).write.mode("overwrite").parquet("foo")
> {code}
> "16" can be replaced with any integer that is greater than the default
> parallelism on your machine (usually it means core number, on my machine it's
> 8).
> {noformat}
> -rw-r--r-- 3 lian supergroup 0 2015-06-17 00:06
> /user/lian/foo/_SUCCESS
> -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
> /user/lian/foo/part-r-00001.gz.parquet
> -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
> /user/lian/foo/part-r-00002.gz.parquet
> -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
> /user/lian/foo/part-r-00003.gz.parquet
> -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
> /user/lian/foo/part-r-00004.gz.parquet
> -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
> /user/lian/foo/part-r-00005.gz.parquet
> -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
> /user/lian/foo/part-r-00006.gz.parquet
> -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
> /user/lian/foo/part-r-00007.gz.parquet
> -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
> /user/lian/foo/part-r-00008.gz.parquet
> -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
> /user/lian/foo/part-r-00017.gz.parquet
> -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
> /user/lian/foo/part-r-00018.gz.parquet
> -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
> /user/lian/foo/part-r-00019.gz.parquet
> -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
> /user/lian/foo/part-r-00020.gz.parquet
> -rw-r--r-- 3 lian supergroup 352 2015-06-17 00:06
> /user/lian/foo/part-r-00021.gz.parquet
> -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
> /user/lian/foo/part-r-00022.gz.parquet
> -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
> /user/lian/foo/part-r-00023.gz.parquet
> -rw-r--r-- 3 lian supergroup 353 2015-06-17 00:06
> /user/lian/foo/part-r-00024.gz.parquet
> {noformat}
> And here is another Spark shell snippet for reproducing overwriting:
> {code}
> sqlContext.range(0,
> 10000).repartition(500).write.mode("overwrite").parquet("foo")
> sqlContext.read.parquet("foo").count()
> {code}
> Expected answer should be {{10000}}, but you may see a number like {{9960}}
> due to overwriting. The actual number varies for different runs and different
> nodes.
> Notice that the newly added ORC data source is less likely to hit this issue
> because it uses task ID and {{System.currentTimeMills()}} to generate the
> output file name. Thus, the ORC data source may hit this issue only when two
> tasks with the same task ID (which means they are in two concurrent jobs) are
> writing to the same location within the same millisecond.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]