I am trying to test the water mark concept in structured streaming using the
below program
import java.sql.Timestamp
import org.apache.spark.sql.functions.{col, expr}
import org.apache.spark.sql.streaming.Trigger
val lines_stream = spark.readStream.
format("kafka").
option("kafka.bootstrap.servers", "vm1:21005,vm2:21005").
option("subscribe", "s1").
load().
select('value.cast("String") as "key",
('value.cast("String")).cast("long").cast
("timestamp") as "timeStampValue").
select("key", "timeStampValue").
withWatermark("timeStampValue", "10 seconds ")
val query = lines_stream.
writeStream.
option("truncate", "false").
outputMode("append").
format("console").
trigger(Trigger.ProcessingTime(3000)).
start()
query.awaitTermination()
//Corresponding output
scala> query.awaitTermination()
-------------------------------------------
Batch: 0
-------------------------------------------
+---+--------------+
|key|timeStampValue|
+---+--------------+
+---+--------------+
-------------------------------------------
Batch: 1
-------------------------------------------
+----------+-------------------+
|key |timeStampValue |
+----------+-------------------+
|1539844822|2018-10-18 14:40:22|
+----------+-------------------+
-------------------------------------------
Batch: 2
-------------------------------------------
+----------+-------------------+
|key |timeStampValue |
+----------+-------------------+
|1539844842|2018-10-18 14:40:42|
+----------+-------------------+
-------------------------------------------
Batch: 3
-------------------------------------------
+----------+-------------------+
|key |timeStampValue |
+----------+-------------------+
|1539844862|2018-10-18 14:41:02|
+----------+-------------------+
-------------------------------------------
Batch: 4
-------------------------------------------
+----------+-------------------+
|key |timeStampValue |
+----------+-------------------+
|1539844882|2018-10-18 14:41:22|
+----------+-------------------+
-------------------------------------------
Batch: 5
-------------------------------------------
+----------+-------------------+
|key |timeStampValue |
+----------+-------------------+
|1539844852|2018-10-18 14:40:52|* // As per watermark this event should be
discarded but it didnt*
+----------+-------------------+
Note:Below are the values I sent from kafka-producer
1539844822
1539844842
1539844862
1539844882
1539844852
Is this correct way to test the water mark scenarios ?
Regards
Sandeep Katta
--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
---------------------------------------------------------------------
To unsubscribe e-mail: [email protected]