Is the number of top K elements you want to keep small? That is, is K
small? In which case, you can
1.  either do it in the driver on the array
DStream.foreachRDD ( rdd => {
   val topK = rdd.top(K) ;
   // use top K
})

2. Or, you can use the topK to create another RDD using sc.makeRDD

DStream.transform ( rdd => {
   val topK = rdd.top(K) ;
   rdd.context.makeRDD(topK, numPartitions)
})

TD



On Fri, Mar 13, 2015 at 5:58 AM, Laeeq Ahmed <laeeqsp...@yahoo.com.invalid>
wrote:

> Hi,
>
> Earlier my code was like follwing but slow due to repartition. I want top
> K of each window in a stream.
>
> val counts = keyAndValues.map(x =>
> math.round(x._3.toDouble)).countByValueAndWindow(Seconds(4), Seconds(4))
> val topCounts = counts.repartition(1).map(_.swap).transform(rdd =>
> rdd.sortByKey(false)).map(_.swap).mapPartitions(rdd => rdd.take(10))
>
> so I thought to use dstream.transform(rdd=>rdd.top()) but this return
> Array rather than rdd. I have to perform further steps on topCounts dstream.
>
> [ERROR]  found   : Array[(Long, Long)]
> [ERROR]  required: org.apache.spark.rdd.RDD[?]
> [ERROR] val topCounts = counts.transform(rdd => rdd.top(10))
>
>
> Regards,
> Laeeq
>
>
>   On Friday, March 13, 2015 1:47 PM, Sean Owen <so...@cloudera.com> wrote:
>
>
> Hm, aren't you able to use the SparkContext here? DStream operations
> happen on the driver. So you can parallelize() the result?
>
> take() won't work as it's not the same as top()
>
> On Fri, Mar 13, 2015 at 11:23 AM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
> > Like this?
> >
> > dtream.repartition(1).mapPartitions(it => it.take(5))
> >
> >
> >
> > Thanks
> > Best Regards
> >
> > On Fri, Mar 13, 2015 at 4:11 PM, Laeeq Ahmed <
> laeeqsp...@yahoo.com.invalid>
> > wrote:
> >>
> >> Hi,
> >>
> >> I normally use dstream.transform whenever I need to use methods which
> are
> >> available in RDD API but not in streaming API. e.g. dstream.transform(x
> =>
> >> x.sortByKey(true))
> >>
> >> But there are other RDD methods which return types other than RDD. e.g.
> >> dstream.transform(x => x.top(5)) top here returns Array.
> >>
> >> In the second scenario, how can i return RDD rather than array, so that
> i
> >> can perform further steps on dstream.
> >>
> >> Regards,
> >> Laeeq
>
> >
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>
>

Reply via email to