Thanks all for the quick response. Thanks.
Zhan Zhang On Mar 26, 2015, at 3:14 PM, Patrick Wendell <pwend...@gmail.com> wrote: > I think we have a version of mapPartitions that allows you to tell > Spark the partitioning is preserved: > > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L639 > > We could also add a map function that does same. Or you can just write > your map using an iterator. > > - Patrick > > On Thu, Mar 26, 2015 at 3:07 PM, Jonathan Coveney <jcove...@gmail.com> wrote: >> This is just a deficiency of the api, imo. I agree: mapValues could >> definitely be a function (K, V)=>V1. The option isn't set by the function, >> it's on the RDD. So you could look at the code and do this. >> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala >> >> def mapValues[U](f: V => U): RDD[(K, U)] = { >> val cleanF = self.context.clean(f) >> new MapPartitionsRDD[(K, U), (K, V)](self, >> (context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) }, >> preservesPartitioning = true) >> } >> >> What you want: >> >> def mapValues[U](f: (K, V) => U): RDD[(K, U)] = { >> val cleanF = self.context.clean(f) >> new MapPartitionsRDD[(K, U), (K, V)](self, >> (context, pid, iter) => iter.map { case t@(k, _) => (k, cleanF(t)) }, >> preservesPartitioning = true) >> } >> >> One of the nice things about spark is that making such new operators is very >> easy :) >> >> 2015-03-26 17:54 GMT-04:00 Zhan Zhang <zzh...@hortonworks.com>: >> >>> Thanks Jonathan. You are right regarding rewrite the example. >>> >>> I mean providing such option to developer so that it is controllable. The >>> example may seems silly, and I don't know the use cases. >>> >>> But for example, if I also want to operate both the key and value part to >>> generate some new value with keeping key part untouched. Then mapValues may >>> not be able to do this. >>> >>> Changing the code to allow this is trivial, but I don't know whether there >>> is some special reason behind this. >>> >>> Thanks. >>> >>> Zhan Zhang >>> >>> >>> >>> >>> On Mar 26, 2015, at 2:49 PM, Jonathan Coveney <jcove...@gmail.com> wrote: >>> >>> I believe if you do the following: >>> >>> >>> sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString >>> >>> (8) MapPartitionsRDD[34] at reduceByKey at <console>:23 [] >>> | MapPartitionsRDD[33] at mapValues at <console>:23 [] >>> | ShuffledRDD[32] at reduceByKey at <console>:23 [] >>> +-(8) MapPartitionsRDD[31] at map at <console>:23 [] >>> | ParallelCollectionRDD[30] at parallelize at <console>:23 [] >>> >>> The difference is that spark has no way to know that your map closure >>> doesn't change the key. if you only use mapValues, it does. Pretty cool that >>> they optimized that :) >>> >>> 2015-03-26 17:44 GMT-04:00 Zhan Zhang <zzh...@hortonworks.com>: >>>> >>>> Hi Folks, >>>> >>>> Does anybody know what is the reason not allowing preserverPartitioning >>>> in RDD.map? Do I miss something here? >>>> >>>> Following example involves two shuffles. I think if preservePartitioning >>>> is allowed, we can avoid the second one, right? >>>> >>>> val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)) >>>> val r2 = r1.map((_, 1)) >>>> val r3 = r2.reduceByKey(_+_) >>>> val r4 = r3.map(x=>(x._1, x._2 + 1)) >>>> val r5 = r4.reduceByKey(_+_) >>>> r5.collect.foreach(println) >>>> >>>> scala> r5.toDebugString >>>> res2: String = >>>> (8) ShuffledRDD[4] at reduceByKey at <console>:29 [] >>>> +-(8) MapPartitionsRDD[3] at map at <console>:27 [] >>>> | ShuffledRDD[2] at reduceByKey at <console>:25 [] >>>> +-(8) MapPartitionsRDD[1] at map at <console>:23 [] >>>> | ParallelCollectionRDD[0] at parallelize at <console>:21 [] >>>> >>>> Thanks. >>>> >>>> Zhan Zhang >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>> >>> >> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org