Which version of Spark do you use?

You can get help on attaching streaming query listener and print out the
QueryProcessEvent to track watermark. The value of watermark will be
updated per batch and next batch will utilize that value.

If watermark exceeds the last timestamp but the value is still added,
please let me know about the version of Spark as well as physical plan (if
you don't mind) and I can take a look.

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 10월 18일 (목) 오후 5:51, sandeep_katta <sandeep0102.opensou...@gmail.com>님이
작성:

> Now I ve added same aggregation query as below but still it is didn't
> filter
>
> val lines_stream = spark.readStream.
>       format("kafka").
>       option("kafka.bootstrap.servers", "vm3:21005,vm2:21005").
>       option("subscribe", "s1").
>       load().
>       withColumn("tokens", split('value, ",")).
>       withColumn("seconds", 'tokens(1) cast "long").
>       withColumn("event_time", to_timestamp(from_unixtime('seconds))). //
> <-- Event time has to be a timestamp
>       withColumn("id", 'tokens(0)).
>       select("id", "event_time").
>       withWatermark("event_time", "10 seconds ").groupBy("id").count()
>
> //Output
> -------------------------------------------
>
> Batch: 0
> -------------------------------------------
> +---+-----+
> |id |count|
> +---+-----+
> +---+-----+
>
> -------------------------------------------
>
> Batch: 1
> -------------------------------------------
> +---+-----+
> |id |count|
> +---+-----+
> |1  |1    |
> +---+-----+
>
> -------------------------------------------
>
> Batch: 2
> -------------------------------------------
> +---+-----+
> |id |count|
> +---+-----+
> |1  |2    |
> +---+-----+
>
> -------------------------------------------
>
> Batch: 3
> -------------------------------------------
> +---+-----+
> |id |count|
> +---+-----+
> |1  |4    |
> +---+-----+
>
> -------------------------------------------
>
> Batch: 4
> -------------------------------------------
> +---+-----+
> |id |count|
> +---+-----+
> |1  |5    | //It should be still 4
> +---+-----+
>
> //values sent from Kafka-producer
>
> 1,1539844822
> 1,1539844842
> 1,1539844862
> 1,1539844882
> 1,1539844852
>
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

Reply via email to