[
https://issues.apache.org/jira/browse/SPARK-45637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jungtaek Lim updated SPARK-45637:
---------------------------------
Labels: correctness (was: )
Priority: Blocker (was: Major)
> Time window aggregation in separate streams followed by stream-stream join
> not returning results
> ------------------------------------------------------------------------------------------------
>
> Key: SPARK-45637
> URL: https://issues.apache.org/jira/browse/SPARK-45637
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 3.5.0
> Environment: I'm using Spark 3.5.0 on Databricks Runtime 14.1
> Reporter: Andrzej Zera
> Priority: Blocker
> Labels: correctness
>
> According to documentation update (SPARK-42591) resulting from SPARK-42376,
> Spark 3.5.0 should support time-window aggregations in two separate streams
> followed by stream-stream window join:
> [https://github.com/apache/spark/blob/261b281e6e57be32eb28bf4e50bea24ed22a9f21/docs/structured-streaming-programming-guide.md?plain=1#L1939-L1995]
> However, I failed to reproduce this example and the query I built doesn't
> return any results:
> {code:java}
> from pyspark.sql.functions import rand
> from pyspark.sql.functions import expr, window, window_time
> spark.conf.set("spark.sql.shuffle.partitions", "1")
> impressions = (
> spark
> .readStream.format("rate").option("rowsPerSecond",
> "5").option("numPartitions", "1").load()
> .selectExpr("value AS adId", "timestamp AS impressionTime")
> )
> impressionsWithWatermark = impressions \
> .selectExpr("adId AS impressionAdId", "impressionTime") \
> .withWatermark("impressionTime", "10 seconds")
> clicks = (
> spark
> .readStream.format("rate").option("rowsPerSecond",
> "5").option("numPartitions", "1").load()
> .where((rand() * 100).cast("integer") < 10) # 10 out of every 100
> impressions result in a click
> .selectExpr("(value - 10) AS adId ", "timestamp AS clickTime") # -10 so
> that a click with same id as impression is generated later (i.e. delayed
> data).
> .where("adId > 0")
> )
> clicksWithWatermark = clicks \
> .selectExpr("adId AS clickAdId", "clickTime") \
> .withWatermark("clickTime", "10 seconds")
> clicksWindow = clicksWithWatermark.groupBy(
> window(clicksWithWatermark.clickTime, "1 minute")
> ).count()
> impressionsWindow = impressionsWithWatermark.groupBy(
> window(impressionsWithWatermark.impressionTime, "1 minute")
> ).count()
> clicksAndImpressions = clicksWindow.join(impressionsWindow, "window", "inner")
> clicksAndImpressions.writeStream \
> .format("memory") \
> .queryName("clicksAndImpressions") \
> .outputMode("append") \
> .start() {code}
>
> My intuition is that I'm getting no results because to output results of the
> first stateful operator (time window aggregation), a watermark needs to pass
> the end timestamp of the window. And once the watermark is after the end
> timestamp of the window, this window is ignored at the second stateful
> operator (stream-stream) join because it's behind the watermark. Indeed, a
> small hack done to event time column (adding one minute) between two stateful
> operators makes it possible to get results:
> {code:java}
> clicksWindow2 = clicksWithWatermark.groupBy(
> window(clicksWithWatermark.clickTime, "1 minute")
> ).count().withColumn("window_time", window_time("window") + expr('INTERVAL 1
> MINUTE')).drop("window")
> impressionsWindow2 = impressionsWithWatermark.groupBy(
> window(impressionsWithWatermark.impressionTime, "1 minute")
> ).count().withColumn("window_time", window_time("window") + expr('INTERVAL 1
> MINUTE')).drop("window")
> clicksAndImpressions2 = clicksWindow2.join(impressionsWindow2, "window_time",
> "inner")
> clicksAndImpressions2.writeStream \
> .format("memory") \
> .queryName("clicksAndImpressions2") \
> .outputMode("append") \
> .start() {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]