[ https://issues.apache.org/jira/browse/SPARK-48023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842509#comment-17842509 ]
Scott Schenkein edited comment on SPARK-48023 at 4/30/24 9:32 PM: ------------------------------------------------------------------ 1. My apologies, I updated the sample code with this import statement. It should work properly if you add that: {code:java} from pyspark.sql.functions import window, lit, col, current_timestamp {code} 2. The stream-stream isn't written to disk, just the source data. The rows only fail to produce if there is a stream-stream with watermark fed into the windowed dropDuplicates. I very much appreciate your help, please let me know if there is anything else you need. was (Author: JIRAUSER302456): 1. My apologies, I updated the sample code with this import statement. It should work properly if you add that: `from pyspark.sql.functions import window, lit, col, current_timestamp` 2. The stream-stream isn't written to disk, just the source data. The rows only fail to produce if there is a stream-stream with watermark fed into the windowed dropDuplicates. I very much appreciate your help, please let me know if there is anything else you need. > Stream-stream join with windowed+watermarked dropDuplicates suppresses all > rows > ------------------------------------------------------------------------------- > > Key: SPARK-48023 > URL: https://issues.apache.org/jira/browse/SPARK-48023 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.5.0 > Reporter: Scott Schenkein > Priority: Major > > When applying a streaming dropDuplicates-with-watermark to a > self-referential stream-stream left-join with watermark, the dropDuplicates > drops all rows. If the watermark is eliminated from the dropDuplicates, the > query behaves as expected. > > The code below demonstrates the error: > > {code:java} > from pyspark.sql.functions import window, lit, col, current_timestamp. # > Added on 4/30 in response to difficulty reproducing > # > # 1. Generate the test data > # > size = 1000 > step_secs = 300 > event_offset_secs = 240 > # Add some lines to not left join > skips = set() > skips.add(3) > skips.add(18) > skips.add(800) > def lit_ts(secs): > return datetime.datetime.fromtimestamp(secs) > data = [] > base_time = time.time() > for x in range(size): > ts = base_time + (step_secs * x) > data.append({"event_id": f"one_{x}", "join_id": x, "ts": lit_ts(ts)}) > if x not in skips: > data.append( > { > "event_id": f"two_{x}", > "join_id": x, > "ts": lit_ts(ts - event_offset_secs), > } > ) > # Add duplicates to validate the dropDuplicates > for i in range(len(data)): > data.append(data[i].copy()) > # > # 2. Write the results so we can stream > # > path = "/tmp/bugtest" > df = spark.createDataFrame(data, schema="event_id string, join_id string, ts > TIMESTAMP") > df.repartition(1).write.format("delta").mode("overwrite").save(path) > df = spark.read.format("delta").load(path) > df.createOrReplaceTempView("test_data") > # > # 3. Define the test query > # > sql = """ > with also_test_data as ( > select * from test_data > where event_id like 'two%' > ) > select td.* from test_data > td left join also_test_data on ( > td.join_id = also_test_data.join_id > and also_test_data.ts >= td.ts - INTERVAL 15 MINUTES > and also_test_data.ts <= td.ts > ) > where td.event_id like 'one%' > -- rows where left-join does not match > and also_test_data.event_id is NULL > """ > # > # 4. Run it non-streaming w/non-deDuplicated to validate results > # > res = spark.sql(sql) > print("Static query") > res.show(truncate=False) > # Static query > # +--------+-------+-------------------------+ > # |event_id|join_id|ts | > # +--------+-------+-------------------------+ > # |one_3 |3 |2024-04-27 17:36:54.97927| > # |one_18 |18 |2024-04-27 18:51:54.97927| > # |one_800 |800 |2024-04-30 12:01:54.97927| > # |one_3 |3 |2024-04-27 17:36:54.97927| > # |one_18 |18 |2024-04-27 18:51:54.97927| > # |one_800 |800 |2024-04-30 12:01:54.97927| > # +--------+-------+-------------------------+ > # > # 5. Run it as a stream with no-dropDuplicates > # > def write_stream(res): > ( > res.writeStream.outputMode("append") > .trigger(availableNow=True) > .format("console") > .start() > .awaitTermination() > ) > sdf = spark.readStream.format("delta").load(path).withWatermark('ts', '15 > minutes') > sdf.createOrReplaceTempView("test_data") > res = spark.sql(sql) > write_stream(res) > # Batch: 1 > # ------------------------------------------- > # +--------+-------+--------------------+ > # |event_id|join_id| ts| > # +--------+-------+--------------------+ > # | one_800| 800|2024-04-30 12:01:...| > # | one_800| 800|2024-04-30 12:01:...| > # | one_3| 3|2024-04-27 17:36:...| > # | one_3| 3|2024-04-27 17:36:...| > # | one_18| 18|2024-04-27 18:51:...| > # | one_18| 18|2024-04-27 18:51:...| > # +--------+-------+--------------------+ > # > # 6. Run it as a stream with dropDuplicates, but no extra watermark > # > sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15 > minutes") > sdf.createOrReplaceTempView("test_data") > res = spark.sql(sql) > res = ( > (res.select(window("ts", "30 minutes"), "*")) > .dropDuplicates(["event_id"]).drop("window") > ) > write_stream(res) > # ------------------------------------------- > # Batch: 1 > # ------------------------------------------- > # +--------+-------+--------------------+ > # |event_id|join_id| ts| > # +--------+-------+--------------------+ > # | one_800| 800|2024-04-30 12:01:...| > # | one_18| 18|2024-04-27 18:51:...| > # | one_3| 3|2024-04-27 17:36:...| > # +--------+-------+--------------------+ > # > # 7. Run it as a stream with dropDuplicates, using a watermark > # THIS is where we see the error > # > sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15 > minutes") > sdf.createOrReplaceTempView("test_data") > res = spark.sql(sql) > res = ( > (res.select(window("ts", "30 minutes"), "*")) > .withWatermark("window", '30 minutes') > .dropDuplicates(["event_id"]).drop("window") > ) > write_stream(res) > print("END") > # ------------------------------------------- > > # Batch: 0 > # ------------------------------------------- > # +--------+-------+---+ > # |event_id|join_id| ts| > # +--------+-------+---+ > # +--------+-------+---+ > # END{code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org