[
https://issues.apache.org/jira/browse/SPARK-5360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kay Ousterhout updated SPARK-5360:
----------------------------------
Description:
CoGroupPartition, part of CoGroupedRDD, includes references to each RDD that
the CoGroupedRDD narrowly depends on, and a reference to the ShuffleHandle.
The partition is serialized separately from the RDD, so when the RDD and
partition arrive on the worker, the references in the partition and in the RDD
no longer point to the same object.
This is a relatively minor performance issue (the closure can be 2x larger than
it needs to be because the rdds and partitions are serialized twice; see
numbers below) but is more annoying as a developer issue (this is where I ran
into): if any state is stored in the RDD or ShuffleHandle on the worker side,
subtle bugs can appear due to the fact that the references to the RDD /
ShuffleHandle in the RDD and in the partition point to separate objects. I'm
not sure if this is enough of a potential future problem to fix this old and
central part of the code, so hoping to get input from others here.
I did some simple experiments to see how much this effects closure size. For
this example:
$ val a = sc.parallelize(1 to 10).map((_, 1))
$ val b = sc.parallelize(1 to 2).map(x => (x, 2*x))
$ a.cogroup(b).collect()
the closure was 1902 bytes with current Spark, and 1129 bytes after my change.
The difference comes from eliminating duplicate serialization of the shuffle
handle.
For this example:
$ val sortedA = a.sortByKey()
$ val sortedB = b.sortByKey()
$ sortedA.cogroup(sortedB).collect()
the closure was 3491 bytes with current Spark, and 1333 bytes after my change.
Here, the difference comes from eliminating duplicate serialization of the two
RDDs for the narrow dependencies.
The ShuffleHandle includes the ShuffleDependency, so this difference will get
larger if a ShuffleDependency includes a serializer, a key ordering, or an
aggregator (all set to None by default). However, the difference is not
affected by the size of the function the user specifies, which (based on my
understanding) is typically the source of large task closures.
was:
CoGroupPartition, part of CoGroupedRDD, includes references to each RDD that
the CoGroupedRDD narrowly depends on, and a reference to the ShuffleHandle.
The partition is serialized separately from the RDD, so when the RDD and
partition arrive on the worker, the references in the partition and in the RDD
no longer point to the same object.
This is a relatively minor performance issue (the closure can be 2x larger than
it needs to be because the rdds and partitions are serialized twice; see
numbers below) but is more annoying as a developer issue (this is where I ran
into): if any state is stored in the RDD or ShuffleHandle on the worker side,
subtle bugs can appear due to the fact that the references to the RDD /
ShuffleHandle in the RDD and in the partition point to separate objects. I'm
not sure if this is enough of a potential future problem to fix this old and
central part of the code, so hoping to get input from others here.
I did some simple experiments to see how much this effects closure size. For
this example:
$ val a = sc.parallelize(1 to 10).map((_, 1))
$ val b = sc.parallelize(1 to 2).map(x => (x, 2*x))
$ a.cogroup(b).collect()
the closure was 1902 bytes with current Spark, and 1129 bytes after my change.
The difference comes from eliminating duplicate serialization of the shuffle
handle.
For this example:
$ val sortedA = a.sortByKey()
$ val sortedB = b.sortByKey()
$ sortedA.cogroup(sortedB).collect()
the closure was 3491 bytes with current Spark, and 1333 bytes after my change.
Here, the difference comes from eliminating duplicate serialization of the two
RDDs for the narrow dependencies.
> For CoGroupedRDD, rdds for narrow dependencies and shuffle handles are
> included twice in serialized task
> --------------------------------------------------------------------------------------------------------
>
> Key: SPARK-5360
> URL: https://issues.apache.org/jira/browse/SPARK-5360
> Project: Spark
> Issue Type: Bug
> Affects Versions: 1.2.0
> Reporter: Kay Ousterhout
> Assignee: Kay Ousterhout
> Priority: Minor
>
> CoGroupPartition, part of CoGroupedRDD, includes references to each RDD that
> the CoGroupedRDD narrowly depends on, and a reference to the ShuffleHandle.
> The partition is serialized separately from the RDD, so when the RDD and
> partition arrive on the worker, the references in the partition and in the
> RDD no longer point to the same object.
> This is a relatively minor performance issue (the closure can be 2x larger
> than it needs to be because the rdds and partitions are serialized twice; see
> numbers below) but is more annoying as a developer issue (this is where I ran
> into): if any state is stored in the RDD or ShuffleHandle on the worker side,
> subtle bugs can appear due to the fact that the references to the RDD /
> ShuffleHandle in the RDD and in the partition point to separate objects. I'm
> not sure if this is enough of a potential future problem to fix this old and
> central part of the code, so hoping to get input from others here.
> I did some simple experiments to see how much this effects closure size. For
> this example:
> $ val a = sc.parallelize(1 to 10).map((_, 1))
> $ val b = sc.parallelize(1 to 2).map(x => (x, 2*x))
> $ a.cogroup(b).collect()
> the closure was 1902 bytes with current Spark, and 1129 bytes after my
> change. The difference comes from eliminating duplicate serialization of the
> shuffle handle.
> For this example:
> $ val sortedA = a.sortByKey()
> $ val sortedB = b.sortByKey()
> $ sortedA.cogroup(sortedB).collect()
> the closure was 3491 bytes with current Spark, and 1333 bytes after my
> change. Here, the difference comes from eliminating duplicate serialization
> of the two RDDs for the narrow dependencies.
> The ShuffleHandle includes the ShuffleDependency, so this difference will get
> larger if a ShuffleDependency includes a serializer, a key ordering, or an
> aggregator (all set to None by default). However, the difference is not
> affected by the size of the function the user specifies, which (based on my
> understanding) is typically the source of large task closures.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]