Sachin Ramachandra Setty created SPARK-25834:
------------------------------------------------
Summary: stream stream Outer join with update mode is throwing
exception
Key: SPARK-25834
URL: https://issues.apache.org/jira/browse/SPARK-25834
Project: Spark
Issue Type: Bug
Components: Structured Streaming
Affects Versions: 2.3.2, 2.3.1
Reporter: Sachin Ramachandra Setty
Execute the below program and can see there is no AnalysisException thrown
import java.sql.Timestamp
import org.apache.spark.sql.functions.\{col, expr}
import org.apache.spark.sql.streaming.Trigger
val lines_stream1 = spark.readStream.
format("kafka").
option("kafka.bootstrap.servers", "10.18.99.58:21005,10.18.99.55:21005").
option("subscribe", "test11").
option("includeTimestamp", true).
load().
selectExpr("CAST (value AS String)","CAST(timestamp AS
TIMESTAMP)").as[(String,Timestamp)].
select(col("value") as("data"),col("timestamp") as("recordTime")).
select("data","recordTime").
withWatermark("recordTime", "20 seconds ")
val lines_stream2 = spark.readStream.
format("kafka").
option("kafka.bootstrap.servers", "10.18.99.58:21005,10.18.99.55:21005").
option("subscribe", "test22").
option("includeTimestamp", value = true).
load().
selectExpr("CAST (value AS String)","CAST(timestamp AS
TIMESTAMP)").as[(String,Timestamp)].
select(col("value") as("data1"),col("timestamp") as("recordTime1")).
select("data1","recordTime1").
withWatermark("recordTime1", "20 seconds ")
val query = lines_stream1.join(lines_stream2, expr (
"""
| data == data1 and
| recordTime1 >= recordTime and
| recordTime1 <= recordTime + interval 20 seconds
""".stripMargin),"*left*").
writeStream.
option("truncate","false").
outputMode("update").
format("console").
trigger(Trigger.ProcessingTime ("2 second")).
start()
query.awaitTermination()
As per the document
https://spark.apache.org/docs/2.3.2/structured-streaming-programming-guide.html#stream-stream-joins
joins are only supported in append mode
*As of Spark 2.3, you can use joins only when the query is in Append output
mode. Other output modes are not yet supported.*
Inner join is working as per spark documentation but it is failed for outer
joins
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]