Thanks Ted! I have created https://issues.apache.org/jira/browse/SPARK-13707 JIRA ticket.
As I commented, I would like to work on the fix if we can decide what should the correct behavior be. -- Thanks Jatin Kumar | Rocket Scientist +91-7696741743 m On Sun, Mar 6, 2016 at 11:30 PM, Ted Yu <yuzhih...@gmail.com> wrote: > Suggest logging a new issue with details provided in this thread. > > Thanks > > On Sun, Mar 6, 2016 at 9:56 AM, Jatin Kumar <jku...@rocketfuelinc.com> > wrote: > >> Hello Ted, >> >> The JIRA you pointed is a different issue. Here are more details of the >> issue I am talking about: >> >> Consider code block this: >> >> val streamingContext = new StreamingContext(sparkConfig, Seconds(2)) >> >> val totalVideoImps = streamingContext.sparkContext.accumulator(0, >> "TotalVideoImpressions") >> val totalImps = streamingContext.sparkContext.accumulator(0, >> "TotalImpressions") >> >> val stream = KafkaReader.KafkaDirectStream(streamingContext) >> stream.map(KafkaAdLogParser.parseAdLogRecord) >> .filter(record => { >> totalImps += 1 >> KafkaAdLogParser.isVideoRecord(record) >> }) >> .map(record => { >> totalVideoImps += 1 >> record.url >> }) >> .window(Seconds(120)) >> .count().foreachRDD((rdd, time) => { >> println("Timestamp: " + >> ImpressionAggregator.millsToDate(time.milliseconds)) >> println("Count: " + rdd.collect()(0)) >> println("Total Impressions: " + totalImps.value) >> totalImps.setValue(0) >> println("Total Video Impressions: " + totalVideoImps.value) >> totalVideoImps.setValue(0) >> }) >> streamingContext.start() >> streamingContext.awaitTermination() >> >> >> Batch Size before window operation is 2 sec and then after window batches >> are of 120 seconds each. Now the output of the above program for first 2 >> batches of 120 sec each is: >> >> Timestamp: 2016-03-06 12:02:56,000 >> Count: 362195 >> Total Impressions: 16882431 >> Total Video Impressions: 362195 >> >> Timestamp: 2016-03-06 12:04:56,000 >> Count: 367168 >> Total Impressions: 19480293 >> Total Video Impressions: 367168 >> >> Timestamp: 2016-03-06 12:06:56,000 >> Count: 177711 >> Total Impressions: 10196677 >> Total Video Impressions: 177711 >> >> Whereas the spark UI shows different numbers as attached in the image. >> Also if we check the start and end index of kafka partition offsets >> reported by subsequent batch entries on UI, they do not result in all >> overall continuous range. All numbers are fine if we remove the window >> operation though. >> >> I think this is a bug and I couldn't find any existing bug regarding this. >> >> -- >> Thanks >> Jatin >> >> On Sun, Mar 6, 2016 at 8:29 PM, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> Have you taken a look at SPARK-12739 ? >>> >>> FYI >>> >>> On Sun, Mar 6, 2016 at 4:06 AM, Jatin Kumar < >>> jku...@rocketfuelinc.com.invalid> wrote: >>> >>>> Hello all, >>>> >>>> Consider following two code blocks: >>>> >>>> val ssc = new StreamingContext(sparkConfig, Seconds(2)) >>>> val stream = KafkaUtils.createDirectStream(...) >>>> >>>> a) stream.filter(filterFunc).count().foreachRDD(rdd => >>>> println(rdd.collect())) >>>> b) stream.filter(filterFunc).window(Seconds(60), >>>> Seconds(60)).count().foreachRDD(rdd => println(rdd.collect())) >>>> >>>> I have observed that in case >>>> a) the UI behaves correctly and numbers reported for each batch are >>>> correct. >>>> b) UI reports numbers every 60 seconds but the batch-id/input-size etc >>>> are for the 2 sec batch after every 60 seconds i.e. 30th batch, 60th batch >>>> etc. These numbers become totally useless, infact confusing in this case >>>> though the delay/processing-time numbers are still helpful. >>>> >>>> Is someone working on a fix to show aggregated numbers which will be >>>> more useful? >>>> >>>> -- >>>> Thanks >>>> Jatin >>>> >>> >>> >> >