[ 
https://issues.apache.org/jira/browse/SPARK-22828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yongqin Xiao updated SPARK-22828:
---------------------------------
    Description: 
I have defined a custom RDD that 
- computes the output based on input data using our traditional data 
transformation code. To give an extreme example, this custom RDD can behave as 
a union, joiner etc. 
- takes one or more parent RDDs as input, where some or all parent RDDs can be 
the same
- reads input parent RDDs in concurrent threads (i.e. reader threads)
- computes the data in one or more transformation thread that concurrently 
running as the reader threads
- ...

In certain cases, we see{color:red} data being corrupted{color} when our reader 
threads read them in. The corruption happens when all of the following 
conditions are met:
- Multiple parent RDDs of the custom RDD are actually the same RDD. e.g. 
same-source union.

{code:java}
The scala code is kind of like this:
Rdd rdd1 = ...
Rdd customRdd = new MyRdd(rdd1, rdd1, ...)
{code}

- The parent RDD is not a result of repartitioning or sorting-within-partition.
- There is no persistence on the same parent RDD.
- spark.sql.shuffle.partitions is set to 1. We saw corruption as well when the 
value is set to small value like 2, which is also the source partition count.

This data corruption happens even when number of executors and cores are set to 
1. Meaning this corruption is not related to multiple partitions running 
concurrently.


Data corruption doesn't happen when either of the condition is met:
1. Instead of setting the same parent RDD as multiple input to my custom RDD, 
we do a select (of all columns) operation on that parent RDD, and use different 
select RDD as input.

{code:java}
 The scala code is like this:
Rdd rdd1 = ...
Rdd customRdd = new MyRdd(rdd1.select($1,$2,...), rdd1.select($1, $2), ...)
{code}

2. we persist the parent RDD

{code:java}
Rdd rdd1 = ...
rdd1.persist(...)
Rdd customRdd = MyRdd(rdd1, rdd1, ...)
{code}

3. we use single thread to read parent RDD in custom RDD implementation
4. Use our default value (100) for spark.sql.shuffle.partitions 

  was:
I have defined a custom RDD that 
- computes the output based on input data using our traditional data 
transformation code. To give an extreme example, this custom RDD can behave as 
a union, joiner etc. 
- takes one or more parent RDDs as input, where some or all parent RDDs can be 
the same
- reads input parent RDDs in concurrent threads (i.e. reader threads)
- computes the data in one or more transformation thread that concurrently 
running as the reader threads
- ...

In certain cases, we see{color:red} data being corrupted{color} when our reader 
threads read them in. The corruption happens when all of the following 
conditions are met:
- Multiple parent RDDs of the custom RDD are actually the same RDD. e.g. 
same-source union.

{code:java}
The scala code is kind of like this:
Rdd rdd1 = ...
Rdd customRdd = new MyRdd(rdd1, rdd1, ...)
{code}

- The parent RDD is not a result of repartitioning or sorting-within-partition.
- There is no persistence on the same parent RDD.
- spark.sql.shuffle.partitions is set to 1. We saw corruption as well when the 
value is set to small value like 2, which is also the source partition count.

This data corruption happens even when number of executors and cores are set to 
1. Meaning this corruption is not related to multiple partitions running 
concurrently.


Data corruption doesn't happen when either of the condition is met:
1. Instead of setting the same parent RDD as multiple input to my custom RDD, 
we do a select (of all columns) operation on that parent RDD, and use different 
select RDD as input.

{code:java}
 The scala code is like this:
Rdd rdd1 = ...
Rdd customRdd = new MyRdd(rdd1.select($1,$2,...), rdd1.select($1, $2), ...)
{code}

2. we persist the parent RDD
Rdd rdd1 = ...
rdd1.persist(...)
Rdd customRdd = MyRdd(rdd1, rdd1, ...)

3. we use single thread to read parent RDD in custom RDD implementation
4. Use our default value (100) for spark.sql.shuffle.partitions 


> Data corruption happens when same RDD being repeatedly used as parent RDD of 
> a custom RDD which reads each parent RDD in concurrent threads
> -------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-22828
>                 URL: https://issues.apache.org/jira/browse/SPARK-22828
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.0
>            Reporter: Yongqin Xiao
>              Labels: spark
>
> I have defined a custom RDD that 
> - computes the output based on input data using our traditional data 
> transformation code. To give an extreme example, this custom RDD can behave 
> as a union, joiner etc. 
> - takes one or more parent RDDs as input, where some or all parent RDDs can 
> be the same
> - reads input parent RDDs in concurrent threads (i.e. reader threads)
> - computes the data in one or more transformation thread that concurrently 
> running as the reader threads
> - ...
> In certain cases, we see{color:red} data being corrupted{color} when our 
> reader threads read them in. The corruption happens when all of the following 
> conditions are met:
> - Multiple parent RDDs of the custom RDD are actually the same RDD. e.g. 
> same-source union.
> {code:java}
> The scala code is kind of like this:
> Rdd rdd1 = ...
> Rdd customRdd = new MyRdd(rdd1, rdd1, ...)
> {code}
> - The parent RDD is not a result of repartitioning or 
> sorting-within-partition.
> - There is no persistence on the same parent RDD.
> - spark.sql.shuffle.partitions is set to 1. We saw corruption as well when 
> the value is set to small value like 2, which is also the source partition 
> count.
> This data corruption happens even when number of executors and cores are set 
> to 1. Meaning this corruption is not related to multiple partitions running 
> concurrently.
> Data corruption doesn't happen when either of the condition is met:
> 1. Instead of setting the same parent RDD as multiple input to my custom RDD, 
> we do a select (of all columns) operation on that parent RDD, and use 
> different select RDD as input.
> {code:java}
>  The scala code is like this:
> Rdd rdd1 = ...
> Rdd customRdd = new MyRdd(rdd1.select($1,$2,...), rdd1.select($1, $2), ...)
> {code}
> 2. we persist the parent RDD
> {code:java}
> Rdd rdd1 = ...
> rdd1.persist(...)
> Rdd customRdd = MyRdd(rdd1, rdd1, ...)
> {code}
> 3. we use single thread to read parent RDD in custom RDD implementation
> 4. Use our default value (100) for spark.sql.shuffle.partitions 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to