[ https://issues.apache.org/jira/browse/SPARK-22828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yongqin Xiao updated SPARK-22828: --------------------------------- Shepherd: Jiang Xingbo > Data corruption happens when same RDD being repeatedly used as parent RDD of > a custom RDD which reads each parent RDD in concurrent threads > ------------------------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-22828 > URL: https://issues.apache.org/jira/browse/SPARK-22828 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.1.0 > Reporter: Yongqin Xiao > Labels: spark > > I have defined a custom RDD that > - computes the output based on input data using our traditional data > transformation code. To give an extreme example, this custom RDD can behave > as a union, joiner etc. > - takes one or more parent RDDs as input, where some or all parent RDDs can > be the same > - reads input parent RDDs in concurrent threads (i.e. reader threads) > - computes the data in one or more transformation thread that concurrently > running as the reader threads > - ... > In certain cases, we see{color:red} data being corrupted{color} when our > reader threads read them in. The corruption happens when all of the following > conditions are met: > - Multiple parent RDDs of the custom RDD are actually the same RDD. e.g. > same-source union. > {code:java} > The scala code is kind of like this: > Rdd rdd1 = ... > Rdd customRdd = new MyRdd(rdd1, rdd1, ...) > {code} > - The parent RDD is not a result of repartitioning or > sorting-within-partition. > - There is no persistence on the same parent RDD. > - spark.sql.shuffle.partitions is set to 1. We saw corruption as well when > the value is set to small value like 2, which is also the source partition > count. > This data corruption happens even when number of executors and cores are set > to 1. Meaning this corruption is not related to multiple partitions running > concurrently. > Data corruption doesn't happen when either of the condition is met: > 1. Instead of setting the same parent RDD as multiple input to my custom RDD, > we do a select (of all columns) operation on that parent RDD, and use > different select RDD as input. > {code:java} > The scala code is like this: > Rdd rdd1 = ... > Rdd customRdd = new MyRdd(rdd1.select($1,$2,...), rdd1.select($1, $2), ...) > {code} > 2. we persist the parent RDD > {code:java} > Rdd rdd1 = ... > rdd1.persist(...) > Rdd customRdd = MyRdd(rdd1, rdd1, ...) > {code} > 3. we use single thread to read parent RDD in custom RDD implementation > 4. Use our default value (100) for spark.sql.shuffle.partitions -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org