[
https://issues.apache.org/jira/browse/SPARK-5360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sean Owen updated SPARK-5360:
-----------------------------
Component/s: Spark Core
> For CoGroupedRDD, rdds for narrow dependencies and shuffle handles are
> included twice in serialized task
> --------------------------------------------------------------------------------------------------------
>
> Key: SPARK-5360
> URL: https://issues.apache.org/jira/browse/SPARK-5360
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 1.2.0
> Reporter: Kay Ousterhout
> Assignee: Kay Ousterhout
> Priority: Minor
>
> CoGroupPartition, part of CoGroupedRDD, includes references to each RDD that
> the CoGroupedRDD narrowly depends on, and a reference to the ShuffleHandle.
> The partition is serialized separately from the RDD, so when the RDD and
> partition arrive on the worker, the references in the partition and in the
> RDD no longer point to the same object.
> This is a relatively minor performance issue (the closure can be 2x larger
> than it needs to be because the rdds and partitions are serialized twice; see
> numbers below) but is more annoying as a developer issue (this is where I ran
> into): if any state is stored in the RDD or ShuffleHandle on the worker side,
> subtle bugs can appear due to the fact that the references to the RDD /
> ShuffleHandle in the RDD and in the partition point to separate objects. I'm
> not sure if this is enough of a potential future problem to fix this old and
> central part of the code, so hoping to get input from others here.
> I did some simple experiments to see how much this effects closure size. For
> this example:
> $ val a = sc.parallelize(1 to 10).map((_, 1))
> $ val b = sc.parallelize(1 to 2).map(x => (x, 2*x))
> $ a.cogroup(b).collect()
> the closure was 1902 bytes with current Spark, and 1129 bytes after my
> change. The difference comes from eliminating duplicate serialization of the
> shuffle handle.
> For this example:
> $ val sortedA = a.sortByKey()
> $ val sortedB = b.sortByKey()
> $ sortedA.cogroup(sortedB).collect()
> the closure was 3491 bytes with current Spark, and 1333 bytes after my
> change. Here, the difference comes from eliminating duplicate serialization
> of the two RDDs for the narrow dependencies.
> The ShuffleHandle includes the ShuffleDependency, so this difference will get
> larger if a ShuffleDependency includes a serializer, a key ordering, or an
> aggregator (all set to None by default). However, the difference is not
> affected by the size of the function the user specifies, which (based on my
> understanding) is typically the source of large task closures.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]