Hi Juan, Daniel,

thank you for your explanations. Indeed, I don't have a big number of keys,
at least not enough to stuck the scheduler.

I was using a method quite similar as what you post, Juan, and yes it
works, but I think this would be more efficient to not call filter on each
key. So, I was thinking something like :
- get the iterator of the KV rdd
- distribute each value into a subset by key and then recreate a rdd from
this subset

Because spark context parallelize method cannot be used inside a
transformation, I wonder if I could do it by creating a custom RDD and then
try to implement something like PairRDDFunctions.lookup method, but
remplacing Seq[V] of course by a RDD

def lookup(key: K): Seq[V] = {
    self.partitioner match {
      case Some(p) =>
        val index = p.getPartition(key)
        val process = (it: Iterator[(K, V)]) => {
          val buf = new ArrayBuffer[V]
          for (pair <- it if pair._1 == key) {
            buf += pair._2
          }
          buf
        } : Seq[V]
        val res = self.context.runJob(self, process, Array(index), false)
        res(0)
      case None =>
        self.filter(_._1 == key).map(_._2).collect()
    }
  }


2015-04-29 15:02 GMT+02:00 Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com>:

> Hi Daniel,
>
> I understood Sébastien was talking having having a high number of keys, I
> guess I was prejudiced by my own problem! :) Anyway I don't think you need
> to use disk or a database to generate a RDD per key, you can use filter
> which I guess would be more efficient because IO is avoided, especially if
> the RDD was cached. For example:
>
> // in the spark shell
> import org.apache.spark.rdd.RDD
> import org.apache.spark.rdd.RDD._
> import scala.reflect.ClassTag
>
> // generate a map from key to rdd of values
> def groupByKeyToRDDs[K, V](pairRDD: RDD[(K, V)]) (implicit kt:
> ClassTag[K], vt: ClassTag[V], ord: Ordering[K]): Map[K, RDD[V]] = {
>     val keys = pairRDD.keys.distinct.collect
>     (for (k <- keys) yield
>         k -> (pairRDD filter(_._1 == k) values)
>     ) toMap
> }
>
> // simple demo
> val xs = sc.parallelize(1 to 1000)
> val ixs = xs map(x => (x % 10, x))
> val gs = groupByKeyToRDDs(ixs)
> gs(1).collect
>
> Just an idea.
>
> Greetings,
>
> Juan Rodriguez
>
>
>
> 2015-04-29 14:20 GMT+02:00 Daniel Darabos <
> daniel.dara...@lynxanalytics.com>:
>
>> Check out http://stackoverflow.com/a/26051042/3318517. It's a nice
>> method for saving the RDD into separate files by key in a single pass. Then
>> you can read the files into separate RDDs.
>>
>> On Wed, Apr 29, 2015 at 2:10 PM, Juan Rodríguez Hortalá <
>> juan.rodriguez.hort...@gmail.com> wrote:
>>
>>> Hi Sébastien,
>>>
>>> I came with a similar problem some time ago, you can see the discussion
>>> in
>>> the Spark users mailing list at
>>>
>>> http://markmail.org/message/fudmem4yy63p62ar#query:+page:1+mid:qv4gw6czf6lb6hpq+state:results
>>> . My experience was that when you create too many RDDs the Spark
>>> scheduler
>>> gets stuck, so if you have many keys in the map you are creating you'll
>>> probably have problems. On the other hand, the latest example I proposed
>>> in
>>> that mailing thread was a batch job in which we start from a single RDD
>>> of
>>> time tagged data, transform the RDD in a list of RDD corresponding to
>>> generating windows according to the time tag of the records, and then
>>> apply
>>> a transformation of RDD to each window RDD, like for example KMeans.run
>>> of
>>> MLlib. This is very similar to what you propose.
>>> So in my humble opinion the approach of generating thousands of RDDs by
>>> filtering doesn't work, and a new RDD class should be implemented for
>>> this.
>>> I have never implemented a custom RDD, but if you want some help I would
>>> be
>>> happy to join you in this task
>>>
>>
>> Sebastien said nothing about thousands of keys. This is a valid problem
>> even if you only have two different keys.
>>
>> Greetings,
>>>
>>> Juan
>>>
>>>
>>>
>>> 2015-04-29 12:56 GMT+02:00 Sébastien Soubré-Lanabère <s.sou...@gmail.com
>>> >:
>>>
>>> > Hello,
>>> >
>>> > I'm facing a problem with custom RDD transformations.
>>> >
>>> > I would like to transform a RDD[K, V] into a Map[K, RDD[V]], meaning a
>>> map
>>> > of RDD by key.
>>> >
>>> > This would be great, for example, in order to process mllib clustering
>>> on V
>>> > values grouped by K.
>>> >
>>> > I know I could do it using filter() on my RDD as many times I have
>>> keys,
>>> > but I'm afraid this would not be efficient (the entire RDD would be
>>> read
>>> > each time, right ?). Then, I could mapByPartition my RDD before
>>> filtering,
>>> > but the code is finally huge...
>>> >
>>> > So, I tried to create a CustomRDD to implement a splitByKey(rdd: RDD[K,
>>> > V]): Map[K, RDD[V]] method, which would iterate on the RDD once time
>>> only,
>>> > but I cannot achieve my development.
>>> >
>>> > Please, could you tell me first if this is really faisable, and then,
>>> could
>>> > you give me some pointers ?
>>> >
>>> > Thank you,
>>> > Regards,
>>> > Sebastien
>>> >
>>>
>>
>>
>

Reply via email to