[ 
https://issues.apache.org/jira/browse/SPARK-3693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14148303#comment-14148303
 ] 

Sandy Ryza commented on SPARK-3693:
-----------------------------------

Spark's documentation actually makes a note of this.

{code}
   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable 
object for each
   * record, directly caching the returned RDD will create many references to 
the same object.
   * If you plan to directly cache Hadoop writable objects, you should first 
copy them using
   * a `map` function.
{code}

Having MemoryStore make a copy would degrade performance in other situations 
where the objects aren't used. While it's not pretty, I don't see a better 
approach than the current behavior where the user needs to explicitly makes a 
copy.  Though we could possibly provide some utility to help if it requires a 
lot of user boilerplate?

> Cached Hadoop RDD always return rows with the same value
> --------------------------------------------------------
>
>                 Key: SPARK-3693
>                 URL: https://issues.apache.org/jira/browse/SPARK-3693
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.2.0
>            Reporter: Xuefu Zhang
>
> While trying RDD caching, it's found that caching a Hadoop RDD causes data 
> correctness issues. The following code snippet demonstrates the usage:
> {code}
> public final class Test {
>     public static void main(String[] args) throws Exception {
>         SparkConf sparkConf = new SparkConf().setAppName("Test");
>         JavaSparkContext ctx = new JavaSparkContext(sparkConf);
>         ... 
>         JavaPairRDD<BytesWritable, BytesWritable> input = 
>                 ctx.hadoopRDD(jobConf, CombineHiveInputClass.class, 
> WritableComparable.class, Writable.class);
>         input = input.cache();
>         input.foreach(new VoidFunction<Tuple2<BytesWritable, 
> BytesWritable>>() {
>             @Override
>             public void call(Tuple2<BytesWritable, BytesWritable> row) throws 
> Exception {
>                 if (row._1() != null) {
>                     System.out.println("Key: " + row._1());
>                 }
>                 if (row._2() != null) {
>                     System.out.println("Value: " + row._2());
>                 }
>             }
>         });
>         ctx.stop();
>     }
> }
> {code}
> In this case, row._2() always gives the same value. If we disable caching by 
> removing input.cache(), the program gives the expected rows.
> Further analysis shows that MemoryStore (see 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala#L236)
>  is storing the references to (key, value) pairs returned by 
> HadoopRDD.getNext() (See 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L220),
>  but this method always returns the same (key, value) object references, 
> except each getNext() call updates values inside these objects. When there 
> are no more records (key, value) objects are filled with empty strings (no 
> values) in CombineFileRecordReader. As all pairs in MemoryStore.vector refer 
> to the same key, value object pairs, all values become NULL.
> Probably MemoryStore should instead store a copy of <key, value> pair rather 
> than keeping a reference to it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to