Now I ve added same aggregation query as below but still it is didn't filter
val lines_stream = spark.readStream. format("kafka"). option("kafka.bootstrap.servers", "vm3:21005,vm2:21005"). option("subscribe", "s1"). load(). withColumn("tokens", split('value, ",")). withColumn("seconds", 'tokens(1) cast "long"). withColumn("event_time", to_timestamp(from_unixtime('seconds))). // <-- Event time has to be a timestamp withColumn("id", 'tokens(0)). select("id", "event_time"). withWatermark("event_time", "10 seconds ").groupBy("id").count() //Output ------------------------------------------- Batch: 0 ------------------------------------------- +---+-----+ |id |count| +---+-----+ +---+-----+ ------------------------------------------- Batch: 1 ------------------------------------------- +---+-----+ |id |count| +---+-----+ |1 |1 | +---+-----+ ------------------------------------------- Batch: 2 ------------------------------------------- +---+-----+ |id |count| +---+-----+ |1 |2 | +---+-----+ ------------------------------------------- Batch: 3 ------------------------------------------- +---+-----+ |id |count| +---+-----+ |1 |4 | +---+-----+ ------------------------------------------- Batch: 4 ------------------------------------------- +---+-----+ |id |count| +---+-----+ |1 |5 | //It should be still 4 +---+-----+ //values sent from Kafka-producer 1,1539844822 1,1539844842 1,1539844862 1,1539844882 1,1539844852 -- Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org