Updated Branches: refs/heads/master 078049877 -> 87676a6af
Memoize preferred locations in ZippedPartitionsBaseRDD so preferred location computation doesn't lead to exponential explosion. (cherry picked from commit e36fe55a031d2c01c9d7c5d85965951c681a0c74) Signed-off-by: Reynold Xin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/9cf7f31e Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/9cf7f31e Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/9cf7f31e Branch: refs/heads/master Commit: 9cf7f31e4d4e542b88b6a474bdf08d07fdd3652c Parents: 743a31a Author: Reynold Xin <[email protected]> Authored: Sat Nov 30 18:07:36 2013 -0800 Committer: Reynold Xin <[email protected]> Committed: Sat Nov 30 18:10:52 2013 -0800 ---------------------------------------------------------------------- .../apache/spark/rdd/ZippedPartitionsRDD.scala | 27 ++++++++------------ 1 file changed, 11 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/9cf7f31e/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala index faeb316..a97d2a0 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala @@ -22,7 +22,8 @@ import java.io.{ObjectOutputStream, IOException} private[spark] class ZippedPartitionsPartition( idx: Int, - @transient rdds: Seq[RDD[_]]) + @transient rdds: Seq[RDD[_]], + @transient val preferredLocations: Seq[String]) extends Partition { override val index: Int = idx @@ -47,27 +48,21 @@ abstract class ZippedPartitionsBaseRDD[V: ClassManifest]( if (preservesPartitioning) firstParent[Any].partitioner else None override def getPartitions: Array[Partition] = { - val sizes = rdds.map(x => x.partitions.size) - if (!sizes.forall(x => x == sizes(0))) { + val numParts = rdds.head.partitions.size + if (!rdds.forall(rdd => rdd.partitions.size == numParts)) { throw new IllegalArgumentException("Can't zip RDDs with unequal numbers of partitions") } - val array = new Array[Partition](sizes(0)) - for (i <- 0 until sizes(0)) { - array(i) = new ZippedPartitionsPartition(i, rdds) + Array.tabulate[Partition](numParts) { i => + val prefs = rdds.map(rdd => rdd.preferredLocations(rdd.partitions(i))) + // Check whether there are any hosts that match all RDDs; otherwise return the union + val exactMatchLocations = prefs.reduce((x, y) => x.intersect(y)) + val locs = if (!exactMatchLocations.isEmpty) exactMatchLocations else prefs.flatten.distinct + new ZippedPartitionsPartition(i, rdds, locs) } - array } override def getPreferredLocations(s: Partition): Seq[String] = { - val parts = s.asInstanceOf[ZippedPartitionsPartition].partitions - val prefs = rdds.zip(parts).map { case (rdd, p) => rdd.preferredLocations(p) } - // Check whether there are any hosts that match all RDDs; otherwise return the union - val exactMatchLocations = prefs.reduce((x, y) => x.intersect(y)) - if (!exactMatchLocations.isEmpty) { - exactMatchLocations - } else { - prefs.flatten.distinct - } + s.asInstanceOf[ZippedPartitionsPartition].preferredLocations } override def clearDependencies() {
