Hi Sandeep, Watermarks are used in aggregation queries to ensure correctness and clean up state. They don't allow you to drop records in map-only scenarios, which you have in your example. If you would do a test of `groupBy().count()` then you will see that the count doesn't increase with the last event.
On Thu, Oct 18, 2018 at 8:48 AM sandeep_katta < sandeep0102.opensou...@gmail.com> wrote: > > I am trying to test the water mark concept in structured streaming using > the > below program > > import java.sql.Timestamp > import org.apache.spark.sql.functions.{col, expr} > import org.apache.spark.sql.streaming.Trigger > > val lines_stream = spark.readStream. > format("kafka"). > option("kafka.bootstrap.servers", "vm1:21005,vm2:21005"). > option("subscribe", "s1"). > load(). > select('value.cast("String") as "key", > ('value.cast("String")).cast("long").cast > ("timestamp") as "timeStampValue"). > select("key", "timeStampValue"). > withWatermark("timeStampValue", "10 seconds ") > > val query = lines_stream. > writeStream. > option("truncate", "false"). > outputMode("append"). > format("console"). > trigger(Trigger.ProcessingTime(3000)). > start() > query.awaitTermination() > > > //Corresponding output > > scala> query.awaitTermination() > ------------------------------------------- > > Batch: 0 > ------------------------------------------- > +---+--------------+ > |key|timeStampValue| > +---+--------------+ > +---+--------------+ > > ------------------------------------------- > Batch: 1 > ------------------------------------------- > +----------+-------------------+ > |key |timeStampValue | > +----------+-------------------+ > |1539844822|2018-10-18 14:40:22| > +----------+-------------------+ > > ------------------------------------------- > Batch: 2 > ------------------------------------------- > +----------+-------------------+ > |key |timeStampValue | > +----------+-------------------+ > |1539844842|2018-10-18 14:40:42| > +----------+-------------------+ > > ------------------------------------------- > Batch: 3 > ------------------------------------------- > +----------+-------------------+ > |key |timeStampValue | > +----------+-------------------+ > |1539844862|2018-10-18 14:41:02| > +----------+-------------------+ > > ------------------------------------------- > Batch: 4 > ------------------------------------------- > +----------+-------------------+ > |key |timeStampValue | > +----------+-------------------+ > |1539844882|2018-10-18 14:41:22| > +----------+-------------------+ > > ------------------------------------------- > Batch: 5 > ------------------------------------------- > +----------+-------------------+ > |key |timeStampValue | > +----------+-------------------+ > |1539844852|2018-10-18 14:40:52|* // As per watermark this event should be > discarded but it didnt* > +----------+-------------------+ > > Note:Below are the values I sent from kafka-producer > 1539844822 > 1539844842 > 1539844862 > 1539844882 > 1539844852 > > Is this correct way to test the water mark scenarios ? > > Regards > Sandeep Katta > > > > > > -- > Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/ > > --------------------------------------------------------------------- > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > >