Another approach I found:

First, I make a PartitionsRDD class which only takes a certain range of
partitions
----------------------------------------------------- 
case class PartitionsRDDPartition(val index:Int, val origSplit:Partition)
extends Partition {}

class PartitionsRDD[U: ClassTag](var prev: RDD[U], drop:Int,take:Int)
extends RDD[U](prev) {
  override def getPartitions: Array[Partition] =
prev.partitions.drop(drop).take(take).zipWithIndex.map{case (split,
idx)=>{new PartitionsRDDPartition(idx,
split)}}.asInstanceOf[Array[Partition]]
  override def compute(split: Partition, context: TaskContext): Iterator[U]
=
prev.iterator(partitions(split.index).asInstanceOf[PartitionsRDDPartition].origSplit,
context)
}
----------------------------------------------------- 

And then I can create my two RDD's using the following:
----------------------------------------------------- 
def splitByPartition[T:ClassTag](rdd: RDD[T], hotPartitions:Int): (RDD[T],
RDD[T]) = {
   val left  = new PartitionsRDD[T](rdd, 0, hotPartitions);
   val right = new PartitionsRDD[T](rdd, hotPartitions,
rdd.numPartitions-hotPartitions);
   (left, right)
}
----------------------------------------------------- 

This approach saves a few minutes when compared to the one in the previous
post (at least on a local test.. I still need to test this on a real
cluster).

Any thought about this? Is this the right thing to do or am I missing
something important?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Splitting-RDD-by-partition-tp26983p26985.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to