Now I ve added same aggregation query as below but still it is didn't filter
val lines_stream = spark.readStream.
format("kafka").
option("kafka.bootstrap.servers", "vm3:21005,vm2:21005").
option("subscribe", "s1").
load().
withColumn("tokens", split('value, ",")).
withColumn("seconds", 'tokens(1) cast "long").
withColumn("event_time", to_timestamp(from_unixtime('seconds))). //
<-- Event time has to be a timestamp
withColumn("id", 'tokens(0)).
select("id", "event_time").
withWatermark("event_time", "10 seconds ").groupBy("id").count()
//Output
-------------------------------------------
Batch: 0
-------------------------------------------
+---+-----+
|id |count|
+---+-----+
+---+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+---+-----+
|id |count|
+---+-----+
|1 |1 |
+---+-----+
-------------------------------------------
Batch: 2
-------------------------------------------
+---+-----+
|id |count|
+---+-----+
|1 |2 |
+---+-----+
-------------------------------------------
Batch: 3
-------------------------------------------
+---+-----+
|id |count|
+---+-----+
|1 |4 |
+---+-----+
-------------------------------------------
Batch: 4
-------------------------------------------
+---+-----+
|id |count|
+---+-----+
|1 |5 | //It should be still 4
+---+-----+
//values sent from Kafka-producer
1,1539844822
1,1539844842
1,1539844862
1,1539844882
1,1539844852
--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
---------------------------------------------------------------------
To unsubscribe e-mail: [email protected]