Now I ve added same aggregation query as below but still it is didn't filter

val lines_stream = spark.readStream.
      format("kafka").
      option("kafka.bootstrap.servers", "vm3:21005,vm2:21005").
      option("subscribe", "s1").
      load().
      withColumn("tokens", split('value, ",")).
      withColumn("seconds", 'tokens(1) cast "long").
      withColumn("event_time", to_timestamp(from_unixtime('seconds))). //
<-- Event time has to be a timestamp
      withColumn("id", 'tokens(0)).
      select("id", "event_time").
      withWatermark("event_time", "10 seconds ").groupBy("id").count()   

//Output
-------------------------------------------                                     
Batch: 0
-------------------------------------------
+---+-----+
|id |count|
+---+-----+
+---+-----+

-------------------------------------------                                     
Batch: 1
-------------------------------------------
+---+-----+
|id |count|
+---+-----+
|1  |1    |
+---+-----+

-------------------------------------------                                     
Batch: 2
-------------------------------------------
+---+-----+
|id |count|
+---+-----+
|1  |2    |
+---+-----+

-------------------------------------------                                     
Batch: 3
-------------------------------------------
+---+-----+
|id |count|
+---+-----+
|1  |4    |
+---+-----+

-------------------------------------------                                     
Batch: 4
-------------------------------------------
+---+-----+
|id |count|
+---+-----+
|1  |5    | //It should be still 4
+---+-----+

//values sent from Kafka-producer

1,1539844822
1,1539844842
1,1539844862
1,1539844882
1,1539844852




--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to