[
https://issues.apache.org/jira/browse/SPARK-26154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dongjoon Hyun updated SPARK-26154:
----------------------------------
Target Version/s: 3.0.0
> Stream-stream joins - left outer join gives inconsistent output
> ---------------------------------------------------------------
>
> Key: SPARK-26154
> URL: https://issues.apache.org/jira/browse/SPARK-26154
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.3.2, 3.0.0
> Environment: Spark version - Spark 2.3.2
> OS- Suse 11
> Reporter: Haripriya
> Assignee: Jungtaek Lim
> Priority: Blocker
> Labels: correctness
> Fix For: 3.0.0
>
>
> Stream-stream joins using left outer join gives inconsistent output
> The data processed once, is being processed again and gives null value. In
> Batch 2, the input data "3" is processed. But again in batch 6, null value
> is provided for same data
> Steps
> In spark-shell
> {code:java}
> scala> import org.apache.spark.sql.functions.{col, expr}
> import org.apache.spark.sql.functions.{col, expr}
> scala> import org.apache.spark.sql.streaming.Trigger
> import org.apache.spark.sql.streaming.Trigger
> scala> val lines_stream1 = spark.readStream.
> | format("kafka").
> | option("kafka.bootstrap.servers", "ip:9092").
> | option("subscribe", "topic1").
> | option("includeTimestamp", true).
> | load().
> | selectExpr("CAST (value AS String)","CAST(timestamp AS
> TIMESTAMP)").as[(String,Timestamp)].
> | select(col("value") as("data"),col("timestamp")
> as("recordTime")).
> | select("data","recordTime").
> | withWatermark("recordTime", "5 seconds ")
> lines_stream1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] =
> [data: string, recordTime: timestamp]
> scala> val lines_stream2 = spark.readStream.
> | format("kafka").
> | option("kafka.bootstrap.servers", "ip:9092").
> | option("subscribe", "topic2").
> | option("includeTimestamp", value = true).
> | load().
> | selectExpr("CAST (value AS String)","CAST(timestamp AS
> TIMESTAMP)").as[(String,Timestamp)].
> | select(col("value") as("data1"),col("timestamp")
> as("recordTime1")).
> | select("data1","recordTime1").
> | withWatermark("recordTime1", "10 seconds ")
> lines_stream2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] =
> [data1: string, recordTime1: timestamp]
> scala> val query = lines_stream1.join(lines_stream2, expr (
> | """
> | | data == data1 and
> | | recordTime1 >= recordTime and
> | | recordTime1 <= recordTime + interval 5 seconds
> | """.stripMargin),"left").
> | writeStream.
> | option("truncate","false").
> | outputMode("append").
> | format("console").option("checkpointLocation",
> "/tmp/leftouter/").
> | trigger(Trigger.ProcessingTime ("5 seconds")).
> | start()
> query: org.apache.spark.sql.streaming.StreamingQuery =
> org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1a48f55b
> {code}
> Step2 : Start producing data
> kafka-console-producer.sh --broker-list ip:9092 --topic topic1
> >1
> >2
> >3
> >4
> >5
> >aa
> >bb
> >cc
> kafka-console-producer.sh --broker-list ip:9092 --topic topic2
> >2
> >2
> >3
> >4
> >5
> >aa
> >cc
> >ee
> >ee
>
> Output obtained:
> {code:java}
> Batch: 0
> -------------------------------------------
> +----+----------+-----+-----------+
> |data|recordTime|data1|recordTime1|
> +----+----------+-----+-----------+
> +----+----------+-----+-----------+
> -------------------------------------------
> Batch: 1
> -------------------------------------------
> +----+----------+-----+-----------+
> |data|recordTime|data1|recordTime1|
> +----+----------+-----+-----------+
> +----+----------+-----+-----------+
> -------------------------------------------
> Batch: 2
> -------------------------------------------
> +----+-----------------------+-----+-----------------------+
> |data|recordTime |data1|recordTime1 |
> +----+-----------------------+-----+-----------------------+
> |3 |2018-11-22 20:09:35.053|3 |2018-11-22 20:09:36.506|
> |2 |2018-11-22 20:09:31.613|2 |2018-11-22 20:09:33.116|
> +----+-----------------------+-----+-----------------------+
> -------------------------------------------
> Batch: 3
> -------------------------------------------
> +----+-----------------------+-----+-----------------------+
> |data|recordTime |data1|recordTime1 |
> +----+-----------------------+-----+-----------------------+
> |4 |2018-11-22 20:09:38.654|4 |2018-11-22 20:09:39.818|
> +----+-----------------------+-----+-----------------------+
> -------------------------------------------
> Batch: 4
> -------------------------------------------
> +----+-----------------------+-----+-----------------------+
> |data|recordTime |data1|recordTime1 |
> +----+-----------------------+-----+-----------------------+
> |5 |2018-11-22 20:09:44.809|5 |2018-11-22 20:09:47.452|
> |1 |2018-11-22 20:09:22.662|null |null |
> +----+-----------------------+-----+-----------------------+
> -------------------------------------------
> Batch: 5
> -------------------------------------------
> +----+-----------------------+-----+-----------------------+
> |data|recordTime |data1|recordTime1 |
> +----+-----------------------+-----+-----------------------+
> |cc |2018-11-22 20:10:06.654|cc |2018-11-22 20:10:08.701|
> |aa |2018-11-22 20:10:01.536|aa |2018-11-22 20:10:03.259|
> +----+-----------------------+-----+-----------------------+
> -------------------------------------------
> Batch: 6
> -------------------------------------------
> +----+-----------------------+-----+-----------+
> |data|recordTime |data1|recordTime1|
> +----+-----------------------+-----+-----------+
> |3 |2018-11-22 20:09:35.053|null |null |
> +----+-----------------------+-----+-----------+
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]