[ 
https://issues.apache.org/jira/browse/SPARK-24382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523227#comment-16523227
 ] 

Richard Yu edited comment on SPARK-24382 at 6/26/18 5:34 AM:
-------------------------------------------------------------

[~karthikus] Yes, the old dates would in fact matter, primarily because of the 
watermark in which you set to ten seconds. If you would look in Spark 
Structured Streaming Programming guide, it states that if the time during which 
the data was received was not within the specified interval of time (by the 
user, in your case, 10 seconds) which begins when the data is generated, the 
data would not be processed. Looking at the presentation of your data, I am 
assuming that the data was generated on May 1st. Meanwhile, you are processing 
the data on May 5th. In other words, you are feeding the data to spark far 
*later* than allowed. (We are talking days here). So as expected, nothing was 
outputted

Edit: May 24th, instead of May 5th is when the processing occurred.


was (Author: yohan123):
[~karthikus] Yes, the old dates would in fact matter, primarily because of the 
watermark in which you set to ten seconds. If you would look in Spark 
Structured Streaming Programming guide, it states that if the time during which 
the data was received was not within the specified interval of time (by the 
user, in your case, 10 seconds) which begins when the data is generated, the 
data would not be processed. Looking at the presentation of your data, I am 
assuming that the data was generated on May 1st. Meanwhile, you are processing 
the data on May 5th. In other words, you are feeding the data to spark far 
*later* than allowed. (We are talking days here). So as expected, nothing was 
outputted.

> Spark Structured Streaming aggregation on old timestamp data
> ------------------------------------------------------------
>
>                 Key: SPARK-24382
>                 URL: https://issues.apache.org/jira/browse/SPARK-24382
>             Project: Spark
>          Issue Type: Question
>          Components: Structured Streaming
>    Affects Versions: 2.2.0
>            Reporter: Karthik
>            Priority: Major
>              Labels: beginner
>
> I am trying to aggregate the count of records every 10 seconds using the 
> structured streaming for the following incoming kafka data
> {code:java}
> { 
> "ts2" : "2018/05/01 00:02:50.041", 
> "serviceGroupId" : "123", 
> "userId" : "avv-0", 
> "stream" : "", 
> "lastUserActivity" : "00:02:50", 
> "lastUserActivityCount" : "0" 
> } 
> { 
> "ts2" : "2018/05/01 00:09:02.079", 
> "serviceGroupId" : "123", 
> "userId" : "avv-0", 
> "stream" : "", 
> "lastUserActivity" : "00:09:02", 
> "lastUserActivityCount" : "0" 
> } 
> { 
> "ts2" : "2018/05/01 00:09:02.086", 
> "serviceGroupId" : "123", 
> "userId" : "avv-2", 
> "stream" : "", 
> "lastUserActivity" : "00:09:02", 
> "lastUserActivityCount" : "0" 
> }
> {code}
> With the following logic
> {code:java}
> val sdvTuneInsAgg1 = df 
>  .withWatermark("ts2", "10 seconds") 
>  .groupBy(window(col("ts2"),"10 seconds")) 
>  .agg(count("*") as "count") 
>  .as[CountMetric1]
> val query1 = sdvTuneInsAgg1.writeStream
> .format("console")
> .foreach(writer)
> .start()
> {code}
> and I do not see any records inside the writer. But, the only anomaly is that 
> the current date is 2018/05/24 but the record that I am processing (ts2) has 
> old dates. Will aggregation / count work in this scenario ?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to