[
https://issues.apache.org/jira/browse/SPARK-7002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14503761#comment-14503761
]
Sean Owen commented on SPARK-7002:
----------------------------------
The shuffle data is a sort of hidden, second type of caching that goes on. I
don't know how much it's supposed to be exposed. My hunch is that if there's an
easy API already to access this info, go ahead and propose adding it to the
debug string, but if it's not otherwise easily accounted for, may not be worth
adding it. It's good to know that there is a logic to what is happening, at
least, rather than a bug.
> Persist on RDD fails the second time if the action is called on a child RDD
> without showing a FAILED message
> ------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-7002
> URL: https://issues.apache.org/jira/browse/SPARK-7002
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 1.3.0
> Environment: Platform: Power8
> OS: Ubuntu 14.10
> Java: java-8-openjdk-ppc64el
> Reporter: Tom Hubregtsen
> Priority: Minor
> Labels: disk, persist, unpersist
>
> The major issue is: Persist on RDD fails the second time if the action is
> called on a child RDD without showing a FAILED message. This is pointed out
> at 2)
> next to this:
> toDebugString on a child RDD does not show that the parent RDD is [Disk
> Serialized 1x Replicated]. This is pointed out at 1)
> Note: I am persisting to disk (DISK_ONLY) to validate that the RDD is or is
> not physically stored, as I did not want to solely rely on a missing line in
> .toDebugString (see comments in trace)
> {code}
> scala> val rdd1 = sc.parallelize(List(1,2,3))
> scala> val rdd2 = rdd1.map(x => (x,x+1))
> scala> val rdd3 = rdd2.reduceByKey( (x,y) => x+y)
> scala> import org.apache.spark.storage.StorageLevel
> scala> rdd2.persist(StorageLevel.DISK_ONLY)
> scala> rdd3.collect()
> scala> rdd2.toDebugString
> res4: String =
> (100) MapPartitionsRDD[1] at map at <console>:23 [Disk Serialized 1x
> Replicated]
> \| CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B;
> DiskSize: 802.0 B
> \| ParallelCollectionRDD[0] at parallelize at <console>:21 [Disk
> Serialized 1x Replicated]
> scala> rdd3.toDebugString
> res5: String =
> (100) ShuffledRDD[2] at reduceByKey at <console>:25 []
> +-(100) MapPartitionsRDD[1] at map at <console>:23 []
> \| CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B;
> DiskSize: 802.0 B
> \| ParallelCollectionRDD[0] at parallelize at <console>:21 []
> // 1) rdd3 does not show that the other RDD's are [Disk Serialized 1x
> Replicated], but the data is on disk. This is verified by
> // a) The line starting with CachedPartitions
> // b) a find in spark_local_dir: "find . -name "\*" \| grep rdd" returns
> "./spark-b39bcf9b-e7d7-4284-bdd2-1be7ac3cacef/blockmgr-4f4c0b1c-b47a-4972-b364-7179ea6e0873/1f/rdd_4_*",
> where * are the number of partitions
> scala> rdd2.unpersist()
> scala> rdd2.toDebugString
> res8: String =
> (100) MapPartitionsRDD[1] at map at <console>:23 []
> \| ParallelCollectionRDD[0] at parallelize at <console>:21 []
> scala> rdd3.toDebugString
> res9: String =
> (100) ShuffledRDD[2] at reduceByKey at <console>:25 []
> +-(100) MapPartitionsRDD[1] at map at <console>:23 []
> \| ParallelCollectionRDD[0] at parallelize at <console>:21 []
> // successfully unpersisted, also not visible on disk
> scala> rdd2.persist(StorageLevel.DISK_ONLY)
> scala> rdd3.collect()
> scala> rdd2.toDebugString
> res18: String =
> (100) MapPartitionsRDD[1] at map at <console>:23 [Disk Serialized 1x
> Replicated]
> \| ParallelCollectionRDD[0] at parallelize at <console>:21 [Disk
> Serialized 1x Replicated]
> scala> rdd3.toDebugString
> res19: String =
> (100) ShuffledRDD[2] at reduceByKey at <console>:25 []
> +-(100) MapPartitionsRDD[1] at map at <console>:23 []
> \| ParallelCollectionRDD[0] at parallelize at <console>:21 []
> // 2) The data is not visible on disk though the find command previously
> mentioned, and is also not mentioned in the toDebugString (no line starting
> with CachedPartitions, even though [Disk Serialized 1x Replicated] is
> mentioned). It does work when you call the action on the actual RDD:
> scala> rdd2.collect()
> scala> rdd2.toDebugString
> res21: String =
> (100) MapPartitionsRDD[1] at map at <console>:23 [Disk Serialized 1x
> Replicated]
> \| CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B;
> DiskSize: 802.0 B
> \| ParallelCollectionRDD[0] at parallelize at <console>:21 [Disk
> Serialized 1x Replicated]
> scala> rdd3.toDebugString
> res22: String =
> (100) ShuffledRDD[2] at reduceByKey at <console>:25 []
> +-(100) MapPartitionsRDD[1] at map at <console>:23 []
> \| CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B;
> DiskSize: 802.0 B
> \| ParallelCollectionRDD[0] at parallelize at <console>:21 []
> // Data appears on disk again (using find command preciously mentioned), and
> line with CachedPartitions is back in the .toDebugString
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]