This is just a deficiency of the api, imo. I agree: mapValues could
definitely be a function (K, V)=>V1. The option isn't set by the function,
it's on the RDD. So you could look at the code and do this.
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
def mapValues[U](f: V => U): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
preservesPartitioning = true)
}
What you want:
def mapValues[U](f: (K, V) => U): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) => iter.map { case t@(k, _) => (k, cleanF(t)) },
preservesPartitioning = true)
}
One of the nice things about spark is that making such new operators is
very easy :)
2015-03-26 17:54 GMT-04:00 Zhan Zhang <[email protected]>:
> Thanks Jonathan. You are right regarding rewrite the example.
>
> I mean providing such option to developer so that it is controllable.
> The example may seems silly, and I don’t know the use cases.
>
> But for example, if I also want to operate both the key and value part to
> generate some new value with keeping key part untouched. Then mapValues may
> not be able to do this.
>
> Changing the code to allow this is trivial, but I don’t know whether
> there is some special reason behind this.
>
> Thanks.
>
> Zhan Zhang
>
>
>
>
> On Mar 26, 2015, at 2:49 PM, Jonathan Coveney <[email protected]> wrote:
>
> I believe if you do the following:
>
>
> sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString
>
> (8) MapPartitionsRDD[34] at reduceByKey at <console>:23 []
> | MapPartitionsRDD[33] at mapValues at <console>:23 []
> | ShuffledRDD[32] at reduceByKey at <console>:23 []
> +-(8) MapPartitionsRDD[31] at map at <console>:23 []
> | ParallelCollectionRDD[30] at parallelize at <console>:23 []
>
> The difference is that spark has no way to know that your map closure
> doesn't change the key. if you only use mapValues, it does. Pretty cool
> that they optimized that :)
>
> 2015-03-26 17:44 GMT-04:00 Zhan Zhang <[email protected]>:
>
>> Hi Folks,
>>
>> Does anybody know what is the reason not allowing preserverPartitioning
>> in RDD.map? Do I miss something here?
>>
>> Following example involves two shuffles. I think if preservePartitioning
>> is allowed, we can avoid the second one, right?
>>
>> val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4))
>> val r2 = r1.map((_, 1))
>> val r3 = r2.reduceByKey(_+_)
>> val r4 = r3.map(x=>(x._1, x._2 + 1))
>> val r5 = r4.reduceByKey(_+_)
>> r5.collect.foreach(println)
>>
>> scala> r5.toDebugString
>> res2: String =
>> (8) ShuffledRDD[4] at reduceByKey at <console>:29 []
>> +-(8) MapPartitionsRDD[3] at map at <console>:27 []
>> | ShuffledRDD[2] at reduceByKey at <console>:25 []
>> +-(8) MapPartitionsRDD[1] at map at <console>:23 []
>> | ParallelCollectionRDD[0] at parallelize at <console>:21 []
>>
>> Thanks.
>>
>> Zhan Zhang
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: [email protected]
>> For additional commands, e-mail: [email protected]
>>
>>
>
>