Thanks Ted! I have created https://issues.apache.org/jira/browse/SPARK-13707
JIRA ticket.

As I commented, I would like to work on the fix if we can decide what
should the correct behavior be.

--
Thanks
Jatin Kumar | Rocket Scientist
+91-7696741743 m

On Sun, Mar 6, 2016 at 11:30 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Suggest logging a new issue with details provided in this thread.
>
> Thanks
>
> On Sun, Mar 6, 2016 at 9:56 AM, Jatin Kumar <jku...@rocketfuelinc.com>
> wrote:
>
>> Hello Ted,
>>
>> The JIRA you pointed is a different issue. Here are more details of the
>> issue I am talking about:
>>
>> Consider code block this:
>>
>> val streamingContext = new StreamingContext(sparkConfig, Seconds(2))
>>
>> val totalVideoImps = streamingContext.sparkContext.accumulator(0, 
>> "TotalVideoImpressions")
>> val totalImps = streamingContext.sparkContext.accumulator(0, 
>> "TotalImpressions")
>>
>> val stream = KafkaReader.KafkaDirectStream(streamingContext)
>> stream.map(KafkaAdLogParser.parseAdLogRecord)
>>   .filter(record => {
>>     totalImps += 1
>>     KafkaAdLogParser.isVideoRecord(record)
>>   })
>>   .map(record => {
>>     totalVideoImps += 1
>>     record.url
>>   })
>>   .window(Seconds(120))
>>   .count().foreachRDD((rdd, time) => {
>>   println("Timestamp: " + 
>> ImpressionAggregator.millsToDate(time.milliseconds))
>>   println("Count: " + rdd.collect()(0))
>>   println("Total Impressions: " + totalImps.value)
>>   totalImps.setValue(0)
>>   println("Total Video Impressions: " + totalVideoImps.value)
>>   totalVideoImps.setValue(0)
>> })
>> streamingContext.start()
>> streamingContext.awaitTermination()
>>
>>
>> Batch Size before window operation is 2 sec and then after window batches
>> are of 120 seconds each. Now the output of the above program for first 2
>> batches of 120 sec each is:
>>
>> Timestamp: 2016-03-06 12:02:56,000
>> Count: 362195
>> Total Impressions: 16882431
>> Total Video Impressions: 362195
>>
>> Timestamp: 2016-03-06 12:04:56,000
>> Count: 367168
>> Total Impressions: 19480293
>> Total Video Impressions: 367168
>>
>> Timestamp: 2016-03-06 12:06:56,000
>> Count: 177711
>> Total Impressions: 10196677
>> Total Video Impressions: 177711
>>
>> Whereas the spark UI shows different numbers as attached in the image.
>> Also if we check the start and end index of kafka partition offsets
>> reported by subsequent batch entries on UI, they do not result in all
>> overall continuous range. All numbers are fine if we remove the window
>> operation though.
>>
>> I think this is a bug and I couldn't find any existing bug regarding this.
>>
>> --
>> Thanks
>> Jatin
>>
>> On Sun, Mar 6, 2016 at 8:29 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> Have you taken a look at SPARK-12739 ?
>>>
>>> FYI
>>>
>>> On Sun, Mar 6, 2016 at 4:06 AM, Jatin Kumar <
>>> jku...@rocketfuelinc.com.invalid> wrote:
>>>
>>>> Hello all,
>>>>
>>>> Consider following two code blocks:
>>>>
>>>> val ssc = new StreamingContext(sparkConfig, Seconds(2))
>>>> val stream = KafkaUtils.createDirectStream(...)
>>>>
>>>> a) stream.filter(filterFunc).count().foreachRDD(rdd =>
>>>> println(rdd.collect()))
>>>> b) stream.filter(filterFunc).window(Seconds(60),
>>>> Seconds(60)).count().foreachRDD(rdd => println(rdd.collect()))
>>>>
>>>> I have observed that in case
>>>> a) the UI behaves correctly and numbers reported for each batch are
>>>> correct.
>>>> b) UI reports numbers every 60 seconds but the batch-id/input-size etc
>>>> are for the 2 sec batch after every 60 seconds i.e. 30th batch, 60th batch
>>>> etc. These numbers become totally useless, infact confusing in this case
>>>> though the delay/processing-time numbers are still helpful.
>>>>
>>>> Is someone working on a fix to show aggregated numbers which will be
>>>> more useful?
>>>>
>>>> --
>>>> Thanks
>>>> Jatin
>>>>
>>>
>>>
>>
>

Reply via email to