Hi Juan, Daniel, thank you for your explanations. Indeed, I don't have a big number of keys, at least not enough to stuck the scheduler.
I was using a method quite similar as what you post, Juan, and yes it works, but I think this would be more efficient to not call filter on each key. So, I was thinking something like : - get the iterator of the KV rdd - distribute each value into a subset by key and then recreate a rdd from this subset Because spark context parallelize method cannot be used inside a transformation, I wonder if I could do it by creating a custom RDD and then try to implement something like PairRDDFunctions.lookup method, but remplacing Seq[V] of course by a RDD def lookup(key: K): Seq[V] = { self.partitioner match { case Some(p) => val index = p.getPartition(key) val process = (it: Iterator[(K, V)]) => { val buf = new ArrayBuffer[V] for (pair <- it if pair._1 == key) { buf += pair._2 } buf } : Seq[V] val res = self.context.runJob(self, process, Array(index), false) res(0) case None => self.filter(_._1 == key).map(_._2).collect() } } 2015-04-29 15:02 GMT+02:00 Juan Rodríguez Hortalá < juan.rodriguez.hort...@gmail.com>: > Hi Daniel, > > I understood Sébastien was talking having having a high number of keys, I > guess I was prejudiced by my own problem! :) Anyway I don't think you need > to use disk or a database to generate a RDD per key, you can use filter > which I guess would be more efficient because IO is avoided, especially if > the RDD was cached. For example: > > // in the spark shell > import org.apache.spark.rdd.RDD > import org.apache.spark.rdd.RDD._ > import scala.reflect.ClassTag > > // generate a map from key to rdd of values > def groupByKeyToRDDs[K, V](pairRDD: RDD[(K, V)]) (implicit kt: > ClassTag[K], vt: ClassTag[V], ord: Ordering[K]): Map[K, RDD[V]] = { > val keys = pairRDD.keys.distinct.collect > (for (k <- keys) yield > k -> (pairRDD filter(_._1 == k) values) > ) toMap > } > > // simple demo > val xs = sc.parallelize(1 to 1000) > val ixs = xs map(x => (x % 10, x)) > val gs = groupByKeyToRDDs(ixs) > gs(1).collect > > Just an idea. > > Greetings, > > Juan Rodriguez > > > > 2015-04-29 14:20 GMT+02:00 Daniel Darabos < > daniel.dara...@lynxanalytics.com>: > >> Check out http://stackoverflow.com/a/26051042/3318517. It's a nice >> method for saving the RDD into separate files by key in a single pass. Then >> you can read the files into separate RDDs. >> >> On Wed, Apr 29, 2015 at 2:10 PM, Juan Rodríguez Hortalá < >> juan.rodriguez.hort...@gmail.com> wrote: >> >>> Hi Sébastien, >>> >>> I came with a similar problem some time ago, you can see the discussion >>> in >>> the Spark users mailing list at >>> >>> http://markmail.org/message/fudmem4yy63p62ar#query:+page:1+mid:qv4gw6czf6lb6hpq+state:results >>> . My experience was that when you create too many RDDs the Spark >>> scheduler >>> gets stuck, so if you have many keys in the map you are creating you'll >>> probably have problems. On the other hand, the latest example I proposed >>> in >>> that mailing thread was a batch job in which we start from a single RDD >>> of >>> time tagged data, transform the RDD in a list of RDD corresponding to >>> generating windows according to the time tag of the records, and then >>> apply >>> a transformation of RDD to each window RDD, like for example KMeans.run >>> of >>> MLlib. This is very similar to what you propose. >>> So in my humble opinion the approach of generating thousands of RDDs by >>> filtering doesn't work, and a new RDD class should be implemented for >>> this. >>> I have never implemented a custom RDD, but if you want some help I would >>> be >>> happy to join you in this task >>> >> >> Sebastien said nothing about thousands of keys. This is a valid problem >> even if you only have two different keys. >> >> Greetings, >>> >>> Juan >>> >>> >>> >>> 2015-04-29 12:56 GMT+02:00 Sébastien Soubré-Lanabère <s.sou...@gmail.com >>> >: >>> >>> > Hello, >>> > >>> > I'm facing a problem with custom RDD transformations. >>> > >>> > I would like to transform a RDD[K, V] into a Map[K, RDD[V]], meaning a >>> map >>> > of RDD by key. >>> > >>> > This would be great, for example, in order to process mllib clustering >>> on V >>> > values grouped by K. >>> > >>> > I know I could do it using filter() on my RDD as many times I have >>> keys, >>> > but I'm afraid this would not be efficient (the entire RDD would be >>> read >>> > each time, right ?). Then, I could mapByPartition my RDD before >>> filtering, >>> > but the code is finally huge... >>> > >>> > So, I tried to create a CustomRDD to implement a splitByKey(rdd: RDD[K, >>> > V]): Map[K, RDD[V]] method, which would iterate on the RDD once time >>> only, >>> > but I cannot achieve my development. >>> > >>> > Please, could you tell me first if this is really faisable, and then, >>> could >>> > you give me some pointers ? >>> > >>> > Thank you, >>> > Regards, >>> > Sebastien >>> > >>> >> >> >