[ 
https://issues.apache.org/jira/browse/SPARK-48023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842509#comment-17842509
 ] 

Scott Schenkein edited comment on SPARK-48023 at 4/30/24 9:33 PM:
------------------------------------------------------------------

1. My apologies, I updated the sample code with this import statement.  It 
should work properly if you add that:

 
{code:java}
from pyspark.sql.functions import window, lit, col, current_timestamp {code}
2. The stream-stream isn't written to disk, just the source data.  The rows 
only fail to produce if there is a stream-stream with watermark fed into the 
windowed dropDuplicates.

 

I very much appreciate your help, please let me know if there is anything else 
you need.

@[~siying] 


was (Author: JIRAUSER302456):
1. My apologies, I updated the sample code with this import statement.  It 
should work properly if you add that:

 
{code:java}
from pyspark.sql.functions import window, lit, col, current_timestamp {code}
2. The stream-stream isn't written to disk, just the source data.  The rows 
only fail to produce if there is a stream-stream with watermark fed into the 
windowed dropDuplicates.

 

I very much appreciate your help, please let me know if there is anything else 
you need.

[~siying] 

> Stream-stream join with windowed+watermarked dropDuplicates suppresses all 
> rows
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-48023
>                 URL: https://issues.apache.org/jira/browse/SPARK-48023
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.5.0
>            Reporter: Scott Schenkein
>            Priority: Major
>
> When applying a streaming dropDuplicates-with-watermark to  a 
> self-referential stream-stream left-join with watermark, the dropDuplicates 
> drops all rows.  If the watermark is eliminated from the dropDuplicates, the 
> query behaves as expected.
>  
> The code below demonstrates the error:
>  
> {code:java}
> from pyspark.sql.functions import window, lit, col, current_timestamp. # 
> Added on 4/30 in response to difficulty reproducing
> # 
> # 1. Generate the test data
> #
> size = 1000
> step_secs = 300
> event_offset_secs = 240
> # Add some lines to not left join
> skips = set()
> skips.add(3)
> skips.add(18)
> skips.add(800)
> def lit_ts(secs):
>     return datetime.datetime.fromtimestamp(secs)
> data = []
> base_time = time.time()
> for x in range(size):
>     ts = base_time + (step_secs * x)
>     data.append({"event_id": f"one_{x}", "join_id": x, "ts": lit_ts(ts)})
>     if x not in skips:
>         data.append(
>             {
>                 "event_id": f"two_{x}",
>                 "join_id": x,
>                 "ts": lit_ts(ts - event_offset_secs),
>             }
>         )
> # Add duplicates to validate the dropDuplicates
> for i in range(len(data)):
>    data.append(data[i].copy()) 
> #
> # 2. Write the results so we can stream
> #
> path = "/tmp/bugtest"
> df = spark.createDataFrame(data, schema="event_id string, join_id string, ts 
> TIMESTAMP")
> df.repartition(1).write.format("delta").mode("overwrite").save(path)
> df = spark.read.format("delta").load(path)
> df.createOrReplaceTempView("test_data")
> #
> # 3. Define the test query
> #
> sql = """
> with also_test_data as (
>    select * from test_data
>    where event_id like 'two%'
> )
> select td.* from test_data
> td left join also_test_data on (
>    td.join_id = also_test_data.join_id
>    and also_test_data.ts >= td.ts - INTERVAL 15 MINUTES
>    and also_test_data.ts <= td.ts
> )
> where td.event_id like 'one%'
> -- rows where left-join does not match
> and also_test_data.event_id is NULL 
> """
> #
> # 4. Run it non-streaming w/non-deDuplicated to validate results
> #
> res = spark.sql(sql)
> print("Static query")
> res.show(truncate=False)
> # Static query
> # +--------+-------+-------------------------+
> # |event_id|join_id|ts                       |
> # +--------+-------+-------------------------+
> # |one_3   |3      |2024-04-27 17:36:54.97927|
> # |one_18  |18     |2024-04-27 18:51:54.97927|
> # |one_800 |800    |2024-04-30 12:01:54.97927|
> # |one_3   |3      |2024-04-27 17:36:54.97927|
> # |one_18  |18     |2024-04-27 18:51:54.97927|
> # |one_800 |800    |2024-04-30 12:01:54.97927|
> # +--------+-------+-------------------------+
> #
> # 5. Run it as a stream with no-dropDuplicates
> #
> def write_stream(res):
>     (   
>         res.writeStream.outputMode("append")
>         .trigger(availableNow=True)
>         .format("console")
>         .start()
>         .awaitTermination()
>     )
> sdf = spark.readStream.format("delta").load(path).withWatermark('ts', '15 
> minutes')
> sdf.createOrReplaceTempView("test_data")
> res = spark.sql(sql)
> write_stream(res)
> # Batch: 1
> # -------------------------------------------
> # +--------+-------+--------------------+
> # |event_id|join_id|                  ts|
> # +--------+-------+--------------------+
> # | one_800|    800|2024-04-30 12:01:...|
> # | one_800|    800|2024-04-30 12:01:...|
> # |   one_3|      3|2024-04-27 17:36:...|
> # |   one_3|      3|2024-04-27 17:36:...|
> # |  one_18|     18|2024-04-27 18:51:...|
> # |  one_18|     18|2024-04-27 18:51:...|
> # +--------+-------+--------------------+
> #
> # 6. Run it as a stream with dropDuplicates, but no extra watermark
> #
> sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15 
> minutes")
> sdf.createOrReplaceTempView("test_data")
> res = spark.sql(sql)
> res = (
>     (res.select(window("ts", "30 minutes"), "*"))
>     .dropDuplicates(["event_id"]).drop("window")
> )
> write_stream(res)
> # -------------------------------------------                                 
> # Batch: 1
> # -------------------------------------------
> # +--------+-------+--------------------+
> # |event_id|join_id|                  ts|
> # +--------+-------+--------------------+
> # | one_800|    800|2024-04-30 12:01:...|
> # |  one_18|     18|2024-04-27 18:51:...|
> # |   one_3|      3|2024-04-27 17:36:...|
> # +--------+-------+--------------------+
> #
> # 7. Run it as a stream with dropDuplicates, using a watermark
> #     THIS is where we see the error
> #
> sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15 
> minutes")
> sdf.createOrReplaceTempView("test_data")
> res = spark.sql(sql)
> res = (
>     (res.select(window("ts", "30 minutes"), "*"))
>     .withWatermark("window", '30 minutes')
>     .dropDuplicates(["event_id"]).drop("window")
> )
> write_stream(res) 
> print("END")
> # -------------------------------------------                                 
>  
> # Batch: 0
> # -------------------------------------------
> # +--------+-------+---+
> # |event_id|join_id| ts|
> # +--------+-------+---+
> # +--------+-------+---+
> # END{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to