I'm not sure if you guys ever picked a preferred method for doing this, but I just encountered it and came up with this method that's working reasonably well on a small dataset. It should be quite easily generalizable to non-String RDDs.
def addRowNumber(r: RDD[String]): RDD[Tuple2[Long,String]] = { val sc = r.sparkContext val partitionSizes = r.mapPartitionsWithIndex( (index, rows) => Iterator( (index, rows.size) ) ).collect val partitionGlobalStartIndex = partitionSizes.sortBy(_._1).map(_._2).scanLeft(0)(_+_) val startIndexes = sc.broadcast(partitionGlobalStartIndex) r.mapPartitionsWithIndex( (partitionIndex, rows) => { val partitionStartIndex = startIndexes.value(partitionIndex) rows.zipWithIndex map { case (row, rowIndex) => (partitionStartIndex + rowIndex, row) } }) } On Wed, Jan 1, 2014 at 4:05 AM, Guillaume Pitel <guillaume.pi...@exensa.com> wrote: > I'm not very comfortable with the idea of generating a rdd from the range > (it might take a lot of memory), dispatching it to the nodes, then zipping. > > You should try and compare the two approaches and give us the performance > comparison. > > Guillaume > > Why not use a zipped RDD? >> >> http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.ZippedRDD >> > > I do not know why no one else suggested this. Of course it has 3 extra > loops (one for counting rdd, one for generating the range, one for > zipping). Apart from this performance problem, any other caveats? > > >> >> I have used something like this in the past. >> >> > val index = sc.parallelize(Range.Long(0, rdd.count, 1), >> rdd.partitions.size) >> > val rddWithIndex = rdd.zip(index) >> >> If that doesn't work, then you could try zipPartitions as well, since >> it has slightly more relaxed constraints. >> >> > -- > [image: eXenSa] > *Guillaume PITEL, Président* > +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53 > > eXenSa S.A.S. <http://www.exensa.com/> > 41, rue Périer - 92120 Montrouge - FRANCE > Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05 >