Hi Daniel,

I understood Sébastien was talking having having a high number of keys, I
guess I was prejudiced by my own problem! :) Anyway I don't think you need
to use disk or a database to generate a RDD per key, you can use filter
which I guess would be more efficient because IO is avoided, especially if
the RDD was cached. For example:

// in the spark shell
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD._
import scala.reflect.ClassTag

// generate a map from key to rdd of values
def groupByKeyToRDDs[K, V](pairRDD: RDD[(K, V)]) (implicit kt: ClassTag[K],
vt: ClassTag[V], ord: Ordering[K]): Map[K, RDD[V]] = {
    val keys = pairRDD.keys.distinct.collect
    (for (k <- keys) yield
        k -> (pairRDD filter(_._1 == k) values)
    ) toMap
}

// simple demo
val xs = sc.parallelize(1 to 1000)
val ixs = xs map(x => (x % 10, x))
val gs = groupByKeyToRDDs(ixs)
gs(1).collect

Just an idea.

Greetings,

Juan Rodriguez



2015-04-29 14:20 GMT+02:00 Daniel Darabos <daniel.dara...@lynxanalytics.com>
:

> Check out http://stackoverflow.com/a/26051042/3318517. It's a nice method
> for saving the RDD into separate files by key in a single pass. Then you
> can read the files into separate RDDs.
>
> On Wed, Apr 29, 2015 at 2:10 PM, Juan Rodríguez Hortalá <
> juan.rodriguez.hort...@gmail.com> wrote:
>
>> Hi Sébastien,
>>
>> I came with a similar problem some time ago, you can see the discussion in
>> the Spark users mailing list at
>>
>> http://markmail.org/message/fudmem4yy63p62ar#query:+page:1+mid:qv4gw6czf6lb6hpq+state:results
>> . My experience was that when you create too many RDDs the Spark scheduler
>> gets stuck, so if you have many keys in the map you are creating you'll
>> probably have problems. On the other hand, the latest example I proposed
>> in
>> that mailing thread was a batch job in which we start from a single RDD of
>> time tagged data, transform the RDD in a list of RDD corresponding to
>> generating windows according to the time tag of the records, and then
>> apply
>> a transformation of RDD to each window RDD, like for example KMeans.run of
>> MLlib. This is very similar to what you propose.
>> So in my humble opinion the approach of generating thousands of RDDs by
>> filtering doesn't work, and a new RDD class should be implemented for
>> this.
>> I have never implemented a custom RDD, but if you want some help I would
>> be
>> happy to join you in this task
>>
>
> Sebastien said nothing about thousands of keys. This is a valid problem
> even if you only have two different keys.
>
> Greetings,
>>
>> Juan
>>
>>
>>
>> 2015-04-29 12:56 GMT+02:00 Sébastien Soubré-Lanabère <s.sou...@gmail.com
>> >:
>>
>> > Hello,
>> >
>> > I'm facing a problem with custom RDD transformations.
>> >
>> > I would like to transform a RDD[K, V] into a Map[K, RDD[V]], meaning a
>> map
>> > of RDD by key.
>> >
>> > This would be great, for example, in order to process mllib clustering
>> on V
>> > values grouped by K.
>> >
>> > I know I could do it using filter() on my RDD as many times I have keys,
>> > but I'm afraid this would not be efficient (the entire RDD would be read
>> > each time, right ?). Then, I could mapByPartition my RDD before
>> filtering,
>> > but the code is finally huge...
>> >
>> > So, I tried to create a CustomRDD to implement a splitByKey(rdd: RDD[K,
>> > V]): Map[K, RDD[V]] method, which would iterate on the RDD once time
>> only,
>> > but I cannot achieve my development.
>> >
>> > Please, could you tell me first if this is really faisable, and then,
>> could
>> > you give me some pointers ?
>> >
>> > Thank you,
>> > Regards,
>> > Sebastien
>> >
>>
>
>

Reply via email to