Which spark version are you on?
On Mon, Jan 27, 2014 at 3:12 AM, Mark Hamstra <[email protected]>wrote: > groupByKey does merge the values associated with the same key in different > partitions: > > scala> val rdd = sc.parallelize(List(1, 1, 1, 1), > 4).mapPartitionsWithIndex((idx, itr) => List(("foo", idx -> > math.random),("bar", idx -> math.random)).toIterator) > > scala> rdd.collect.foreach(println) > > (foo,(0,0.7387266457142971)) > (bar,(0,0.06390701080780203)) > (foo,(1,0.3601832111876926)) > (bar,(1,0.5247725435958681)) > (foo,(2,0.7486323021599729)) > (bar,(2,0.9185837845634715)) > (foo,(3,0.17591718413623136)) > (bar,(3,0.12096331089133605)) > > scala> rdd.groupByKey.collect.foreach(println) > > (foo,ArrayBuffer((0,0.8432285514154537), (1,0.3005967566708283), > (2,0.6150820518108783), (3,0.4779052219014124))) > (bar,ArrayBuffer((0,0.8190206253566251), (1,0.3465707665527258), > (2,0.5187789456090471), (3,0.9612998198743644))) > > > On Sun, Jan 26, 2014 at 12:22 PM, Archit Thakur > <[email protected]>wrote: > > > Hi, > > > > Below is the implementation for GroupByKey. (v, 0.8.0) > > > > > > def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = { > > def createCombiner(v: V) = ArrayBuffer(v) > > def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v > > val bufs = combineByKey[ArrayBuffer[V]]( > > createCombiner _, mergeValue _, null, partitioner, > > mapSideCombine=false) > > bufs.asInstanceOf[RDD[(K, Seq[V])]] > > } > > > > and CombineValuesByKey (Aggregator.scala): > > > > def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : > Iterator[(K, > > C)] = { > > val combiners = new JHashMap[K, C] > > for (kv <- iter) { > > val oldC = combiners.get(kv._1) > > if (oldC == null) { > > combiners.put(kv._1, createCombiner(kv._2)) > > } else { > > combiners.put(kv._1, mergeValue(oldC, kv._2)) > > } > > } > > combiners.iterator > > } > > > > My doubt is why null is being passed for mergeCombiners closure. > > > > If two different partitions have same key, wouldn't there be the > > requirement to merge them afterwards? > > > > Thanks, > > Archit. > > >
