You can probably do it in a simpler but sort of hacky way!

If your window size is W and sliding interval S, you can do some math to
figure out how many of the first windows are actually partial windows. Its
probably math.ceil(W/S) . So in a windowDStream.foreachRDD() you can
increment a global counter to count how many RDDs have been generated and
ignore the first few RDDs.

windowDStream.foreachRDD(rdd => {
    Global.counter += 1
    if (Global.counter < math.ceil(W/S)) {
      return  // ignore
    } else {
         // do something awesome
    }
})


On Tue, Mar 25, 2014 at 7:29 AM, Adrian Mocanu <amoc...@verticalscope.com>wrote:

> Let me rephrase that,
> Do you think it is possible to use an accumulator to skip the first few
> incomplete RDDs?
>
> -----Original Message-----
> From: Adrian Mocanu [mailto:amoc...@verticalscope.com]
> Sent: March-25-14 9:57 AM
> To: user@spark.apache.org
> Cc: u...@spark.incubator.apache.org
> Subject: RE: [bug?] streaming window unexpected behaviour
>
> Thanks TD!
> Is it possible to perhaps add another window method that doesn't not
> generate partial windows? Or, Is it possible to remove the first few
> partial windows? I'm thinking of using an accumulator to count how many
> windows there are.
>
> -A
>
> -----Original Message-----
> From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
> Sent: March-24-14 6:55 PM
> To: user@spark.apache.org
> Cc: u...@spark.incubator.apache.org
> Subject: Re: [bug?] streaming window unexpected behaviour
>
> Yes, I believe that is current behavior. Essentially, the first few RDDs
> will be partial windows (assuming window duration > sliding interval).
>
> TD
>
>
> On Mon, Mar 24, 2014 at 1:12 PM, Adrian Mocanu <amoc...@verticalscope.com>
> wrote:
> > I have what I would call unexpected behaviour when using window on a
> stream.
> >
> > I have 2 windowed streams with a 5s batch interval. One window stream
> > is (5s,5s)=smallWindow and the other (10s,5s)=bigWindow
> >
> > What I've noticed is that the 1st RDD produced by bigWindow is
> > incorrect and is of the size 5s not 10s. So instead of waiting 10s and
> > producing 1 RDD with size 10s, Spark produced the 1st 10s RDD of size 5s.
> >
> > Why is this happening? To me it looks like a bug; Matei or TD can you
> > verify that this is correct behaviour?
> >
> >
> >
> >
> >
> > I have the following code
> >
> > val ssc = new StreamingContext(conf, Seconds(5))
> >
> >
> >
> > val smallWindowStream = ssc.queueStream(smallWindowRddQueue)
> >
> > val bigWindowStream = ssc.queueStream(bigWindowRddQueue)
> >
> >
> >
> > val smallWindow = smallWindowReshapedStream.window(Seconds(5),
> > Seconds(5))
> >
> >       .reduceByKey((t1, t2) => (t1._1, t1._2, t1._3 + t2._3))
> >
> > val bigWindow = bigWindowReshapedStream.window(Seconds(10),
> > Seconds(5))
> >
> >         .reduceByKey((t1, t2) => (t1._1, t1._2, t1._3 + t2._3))
> >
> >
> >
> > -Adrian
> >
> >
>

Reply via email to