[ https://issues.apache.org/jira/browse/SPARK-48023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Scott Schenkein updated SPARK-48023: ------------------------------------ Description: When applying a streaming dropDuplicates-with-watermark to a self-referential stream-stream left-join with watermark, the dropDuplicates drops all rows. If the watermark is eliminated from the dropDuplicates, the query behaves as expected. The code below demonstrates the error: {code:java} # # 1. Generate the test data # size = 1000 step_secs = 300 event_offset_secs = 240 # Add some lines to not left join skips = set() skips.add(3) skips.add(18) skips.add(800) def lit_ts(secs): return datetime.datetime.fromtimestamp(secs) data = [] base_time = time.time() for x in range(size): ts = base_time + (step_secs * x) data.append({"event_id": f"one_{x}", "join_id": x, "ts": lit_ts(ts)}) if x not in skips: data.append( { "event_id": f"two_{x}", "join_id": x, "ts": lit_ts(ts - event_offset_secs), } ) # Add duplicates to validate the dropDuplicates for i in range(len(data)): data.append(data[i].copy()) # # 2. Write the results so we can stream # path = "/tmp/bugtest" df = spark.createDataFrame(data, schema="event_id string, join_id string, ts TIMESTAMP") df.repartition(1).write.format("delta").mode("overwrite").save(path) df = spark.read.format("delta").load(path) df.createOrReplaceTempView("test_data") # # 3. Define the test query # sql = """ with also_test_data as ( select * from test_data where event_id like 'two%' ) select td.* from test_data td left join also_test_data on ( td.join_id = also_test_data.join_id and also_test_data.ts >= td.ts - INTERVAL 15 MINUTES and also_test_data.ts <= td.ts ) where td.event_id like 'one%' and also_test_data.event_id is NULL """ # # 4. Run it non-streaming w/non-deDuplicated to validate results # res = spark.sql(sql) print("Static query") res.show(truncate=False) # Static query # +--------+-------+-------------------------+ # |event_id|join_id|ts | # +--------+-------+-------------------------+ # |one_3 |3 |2024-04-27 17:36:54.97927| # |one_18 |18 |2024-04-27 18:51:54.97927| # |one_800 |800 |2024-04-30 12:01:54.97927| # |one_3 |3 |2024-04-27 17:36:54.97927| # |one_18 |18 |2024-04-27 18:51:54.97927| # |one_800 |800 |2024-04-30 12:01:54.97927| # +--------+-------+-------------------------+ # # 5. Run it as a stream with no-dropDuplicates # def write_stream(res): ( res.writeStream.outputMode("append") .trigger(availableNow=True) .format("console") .start() .awaitTermination() ) sdf = spark.readStream.format("delta").load(path).withWatermark('ts', '15 minutes') sdf.createOrReplaceTempView("test_data") res = spark.sql(sql) write_stream(res) # Batch: 1 # ------------------------------------------- # +--------+-------+--------------------+ # |event_id|join_id| ts| # +--------+-------+--------------------+ # | one_800| 800|2024-04-30 12:01:...| # | one_800| 800|2024-04-30 12:01:...| # | one_3| 3|2024-04-27 17:36:...| # | one_3| 3|2024-04-27 17:36:...| # | one_18| 18|2024-04-27 18:51:...| # | one_18| 18|2024-04-27 18:51:...| # +--------+-------+--------------------+ # # 6. Run it as a stream with dropDuplicates, but no extra watermark # sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15 minutes") sdf.createOrReplaceTempView("test_data") res = spark.sql(sql) res = ( (res.select(window("ts", "30 minutes"), "*")) .dropDuplicates(["event_id"]).drop("window") ) write_stream(res) # ------------------------------------------- # Batch: 1 # ------------------------------------------- # +--------+-------+--------------------+ # |event_id|join_id| ts| # +--------+-------+--------------------+ # | one_800| 800|2024-04-30 12:01:...| # | one_18| 18|2024-04-27 18:51:...| # | one_3| 3|2024-04-27 17:36:...| # +--------+-------+--------------------+ # # 7. Run it as a stream with dropDuplicates, using a watermark # THIS is where we see the error # sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15 minutes") sdf.createOrReplaceTempView("test_data") res = spark.sql(sql) res = ( (res.select(window("ts", "30 minutes"), "*")) .withWatermark("window", '30 minutes') .dropDuplicates(["event_id"]).drop("window") ) write_stream(res) print("END") # ------------------------------------------- # Batch: 0 # ------------------------------------------- # +--------+-------+---+------------+ # |event_id|join_id| ts|detection_ts| # +--------+-------+---+------------+ # +--------+-------+---+------------+ # END{code} was: When applying a streaming dropDuplicates-with-watermark to a self-referential stream-stream left-join with watermark, the dropDuplicates drops all rows. If the watermark is eliminated from the dropDuplicates, the query behaves as expected. The code below demonstrates the error: {code:java} # # 1. Generate the test data # size = 1000 step_secs = 300 event_offset_secs = 240 # Add some lines to not left join skips = set() skips.add(3) skips.add(18) skips.add(800) def lit_ts(secs): return datetime.datetime.fromtimestamp(secs) data = [] base_time = time.time() for x in range(size): ts = base_time + (step_secs * x) data.append({"event_id": f"one_{x}", "join_id": x, "ts": lit_ts(ts)}) if x not in skips: data.append( { "event_id": f"two_{x}", "join_id": x, "ts": lit_ts(ts - event_offset_secs), } ) # Add duplicates to validate the dropDuplicates for i in range(len(data)): data.append(data[i].copy()) # # 2. Write the results so we can stream # path = "/tmp/bugtest" df = spark.createDataFrame(data, schema="event_id string, join_id string, ts TIMESTAMP") df.repartition(1).write.format("delta").mode("overwrite").save(path) df = spark.read.format("delta").load(path) df.createOrReplaceTempView("test_data") # # 3. Define the test query # sql = """ with also_test_data as ( select * from test_data where event_id like 'two%' ) select td.* from test_data td left join also_test_data on ( td.join_id = also_test_data.join_id and also_test_data.ts >= td.ts - INTERVAL 15 MINUTES and also_test_data.ts <= td.ts ) where td.event_id like 'one%' and also_test_data.event_id is NULL """ # # 4. Run it non-streaming w/non-deDuplicated to validate results # res = spark.sql(sql) print("Static query") res.show(truncate=False) # +--------+-------+-------------------------+ # |event_id|join_id|ts | # +--------+-------+-------------------------+ # |one_3 |3 |2024-04-27 17:36:54.97927| # |one_18 |18 |2024-04-27 18:51:54.97927| # |one_800 |800 |2024-04-30 12:01:54.97927| # |one_3 |3 |2024-04-27 17:36:54.97927| # |one_18 |18 |2024-04-27 18:51:54.97927| # |one_800 |800 |2024-04-30 12:01:54.97927| # +--------+-------+-------------------------+ # # 5. Run it as a stream with no-dropDuplicates # def write_stream(res): ( res.writeStream.outputMode("append") .trigger(availableNow=True) .format("console") .start() .awaitTermination() ) sdf = spark.readStream.format("delta").load(path).withWatermark('ts', '15 minutes') sdf.createOrReplaceTempView("test_data") res = spark.sql(sql) write_stream(res) # Batch: 1 # ------------------------------------------- # +--------+-------+--------------------+ # |event_id|join_id| ts| # +--------+-------+--------------------+ # | one_800| 800|2024-04-30 12:01:...| # | one_800| 800|2024-04-30 12:01:...| # | one_3| 3|2024-04-27 17:36:...| # | one_3| 3|2024-04-27 17:36:...| # | one_18| 18|2024-04-27 18:51:...| # | one_18| 18|2024-04-27 18:51:...| # +--------+-------+--------------------+ # # 6. Run it as a stream with dropDuplicates, but no extra watermark # sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15 minutes") sdf.createOrReplaceTempView("test_data") res = spark.sql(sql) res = ( (res.select(window("ts", "30 minutes"), "*")) .dropDuplicates(["event_id"]).drop("window") ) write_stream(res) # ------------------------------------------- # Batch: 1 # ------------------------------------------- # +--------+-------+--------------------+ # |event_id|join_id| ts| # +--------+-------+--------------------+ # | one_800| 800|2024-04-30 12:01:...| # | one_18| 18|2024-04-27 18:51:...| # | one_3| 3|2024-04-27 17:36:...| # +--------+-------+--------------------+ # # 7. Run it as a stream with dropDuplicates, using a watermark # THIS is where we see the error # sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15 minutes") sdf.createOrReplaceTempView("test_data") res = spark.sql(sql) res = ( (res.select(window("ts", "30 minutes"), "*")) .withWatermark("window", '30 minutes') .dropDuplicates(["event_id"]).drop("window") ) write_stream(res) print("END") # ------------------------------------------- # Batch: 0 # ------------------------------------------- # +--------+-------+---+------------+ # |event_id|join_id| ts|detection_ts| # +--------+-------+---+------------+ # +--------+-------+---+------------+ # END{code} > Stream-stream join with windowed drop-duplicates suppresses all rows > -------------------------------------------------------------------- > > Key: SPARK-48023 > URL: https://issues.apache.org/jira/browse/SPARK-48023 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.5.0 > Reporter: Scott Schenkein > Priority: Major > > When applying a streaming dropDuplicates-with-watermark to a > self-referential stream-stream left-join with watermark, the dropDuplicates > drops all rows. If the watermark is eliminated from the dropDuplicates, the > query behaves as expected. > > The code below demonstrates the error: > > {code:java} > # > # 1. Generate the test data > # > size = 1000 > step_secs = 300 > event_offset_secs = 240 > # Add some lines to not left join > skips = set() > skips.add(3) > skips.add(18) > skips.add(800) > def lit_ts(secs): > return datetime.datetime.fromtimestamp(secs) > data = [] > base_time = time.time() > for x in range(size): > ts = base_time + (step_secs * x) > data.append({"event_id": f"one_{x}", "join_id": x, "ts": lit_ts(ts)}) > if x not in skips: > data.append( > { > "event_id": f"two_{x}", > "join_id": x, > "ts": lit_ts(ts - event_offset_secs), > } > ) > # Add duplicates to validate the dropDuplicates > for i in range(len(data)): > data.append(data[i].copy()) > # > # 2. Write the results so we can stream > # > path = "/tmp/bugtest" > df = spark.createDataFrame(data, schema="event_id string, join_id string, ts > TIMESTAMP") > df.repartition(1).write.format("delta").mode("overwrite").save(path) > df = spark.read.format("delta").load(path) > df.createOrReplaceTempView("test_data") > # > # 3. Define the test query > # > sql = """ > with also_test_data as ( > select * from test_data > where event_id like 'two%' > ) > select td.* from test_data > td left join also_test_data on ( > td.join_id = also_test_data.join_id > and also_test_data.ts >= td.ts - INTERVAL 15 MINUTES > and also_test_data.ts <= td.ts > ) > where td.event_id like 'one%' > and also_test_data.event_id is NULL > """ > # > # 4. Run it non-streaming w/non-deDuplicated to validate results > # > res = spark.sql(sql) > print("Static query") > res.show(truncate=False) > # Static query > # +--------+-------+-------------------------+ > # |event_id|join_id|ts | > # +--------+-------+-------------------------+ > # |one_3 |3 |2024-04-27 17:36:54.97927| > # |one_18 |18 |2024-04-27 18:51:54.97927| > # |one_800 |800 |2024-04-30 12:01:54.97927| > # |one_3 |3 |2024-04-27 17:36:54.97927| > # |one_18 |18 |2024-04-27 18:51:54.97927| > # |one_800 |800 |2024-04-30 12:01:54.97927| > # +--------+-------+-------------------------+ > # > # 5. Run it as a stream with no-dropDuplicates > # > def write_stream(res): > ( > res.writeStream.outputMode("append") > .trigger(availableNow=True) > .format("console") > .start() > .awaitTermination() > ) > sdf = spark.readStream.format("delta").load(path).withWatermark('ts', '15 > minutes') > sdf.createOrReplaceTempView("test_data") > res = spark.sql(sql) > write_stream(res) > # Batch: 1 > # ------------------------------------------- > # +--------+-------+--------------------+ > # |event_id|join_id| ts| > # +--------+-------+--------------------+ > # | one_800| 800|2024-04-30 12:01:...| > # | one_800| 800|2024-04-30 12:01:...| > # | one_3| 3|2024-04-27 17:36:...| > # | one_3| 3|2024-04-27 17:36:...| > # | one_18| 18|2024-04-27 18:51:...| > # | one_18| 18|2024-04-27 18:51:...| > # +--------+-------+--------------------+ > # > # 6. Run it as a stream with dropDuplicates, but no extra watermark > # > sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15 > minutes") > sdf.createOrReplaceTempView("test_data") > res = spark.sql(sql) > res = ( > (res.select(window("ts", "30 minutes"), "*")) > .dropDuplicates(["event_id"]).drop("window") > ) > write_stream(res) > # ------------------------------------------- > # Batch: 1 > # ------------------------------------------- > # +--------+-------+--------------------+ > # |event_id|join_id| ts| > # +--------+-------+--------------------+ > # | one_800| 800|2024-04-30 12:01:...| > # | one_18| 18|2024-04-27 18:51:...| > # | one_3| 3|2024-04-27 17:36:...| > # +--------+-------+--------------------+ > # > # 7. Run it as a stream with dropDuplicates, using a watermark > # THIS is where we see the error > # > sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15 > minutes") > sdf.createOrReplaceTempView("test_data") > res = spark.sql(sql) > res = ( > (res.select(window("ts", "30 minutes"), "*")) > .withWatermark("window", '30 minutes') > .dropDuplicates(["event_id"]).drop("window") > ) > write_stream(res) > print("END") > # ------------------------------------------- > > # Batch: 0 > # ------------------------------------------- > # +--------+-------+---+------------+ > # |event_id|join_id| ts|detection_ts| > # +--------+-------+---+------------+ > # +--------+-------+---+------------+ > # END{code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org