[
https://issues.apache.org/jira/browse/SPARK-48023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Scott Schenkein updated SPARK-48023:
------------------------------------
Description:
When applying a streaming dropDuplicates-with-watermark to a self-referential
stream-stream left-join with watermark, the dropDuplicates drops all rows. If
the watermark is eliminated from the dropDuplicates, the query behaves as
expected.
The code below demonstrates the error:
{code:java}
#
# 1. Generate the test data
#
size = 1000
step_secs = 300
event_offset_secs = 240
# Add some lines to not left join
skips = set()
skips.add(3)
skips.add(18)
skips.add(800)
def lit_ts(secs):
return datetime.datetime.fromtimestamp(secs)
data = []
base_time = time.time()
for x in range(size):
ts = base_time + (step_secs * x)
data.append({"event_id": f"one_{x}", "join_id": x, "ts": lit_ts(ts)})
if x not in skips:
data.append(
{
"event_id": f"two_{x}",
"join_id": x,
"ts": lit_ts(ts - event_offset_secs),
}
)
# Add duplicates to validate the dropDuplicates
for i in range(len(data)):
data.append(data[i].copy())
#
# 2. Write the results so we can stream
#
path = "/tmp/bugtest"
df = spark.createDataFrame(data, schema="event_id string, join_id string, ts
TIMESTAMP")
df.repartition(1).write.format("delta").mode("overwrite").save(path)
df = spark.read.format("delta").load(path)
df.createOrReplaceTempView("test_data")
#
# 3. Define the test query
#
sql = """
with also_test_data as (
select * from test_data
where event_id like 'two%'
)
select td.* from test_data
td left join also_test_data on (
td.join_id = also_test_data.join_id
and also_test_data.ts >= td.ts - INTERVAL 15 MINUTES
and also_test_data.ts <= td.ts
)
where td.event_id like 'one%'
-- rows where left-join does not match
and also_test_data.event_id is NULL
"""
#
# 4. Run it non-streaming w/non-deDuplicated to validate results
#
res = spark.sql(sql)
print("Static query")
res.show(truncate=False)
# Static query
# +--------+-------+-------------------------+
# |event_id|join_id|ts |
# +--------+-------+-------------------------+
# |one_3 |3 |2024-04-27 17:36:54.97927|
# |one_18 |18 |2024-04-27 18:51:54.97927|
# |one_800 |800 |2024-04-30 12:01:54.97927|
# |one_3 |3 |2024-04-27 17:36:54.97927|
# |one_18 |18 |2024-04-27 18:51:54.97927|
# |one_800 |800 |2024-04-30 12:01:54.97927|
# +--------+-------+-------------------------+
#
# 5. Run it as a stream with no-dropDuplicates
#
def write_stream(res):
(
res.writeStream.outputMode("append")
.trigger(availableNow=True)
.format("console")
.start()
.awaitTermination()
)
sdf = spark.readStream.format("delta").load(path).withWatermark('ts', '15
minutes')
sdf.createOrReplaceTempView("test_data")
res = spark.sql(sql)
write_stream(res)
# Batch: 1
# -------------------------------------------
# +--------+-------+--------------------+
# |event_id|join_id| ts|
# +--------+-------+--------------------+
# | one_800| 800|2024-04-30 12:01:...|
# | one_800| 800|2024-04-30 12:01:...|
# | one_3| 3|2024-04-27 17:36:...|
# | one_3| 3|2024-04-27 17:36:...|
# | one_18| 18|2024-04-27 18:51:...|
# | one_18| 18|2024-04-27 18:51:...|
# +--------+-------+--------------------+
#
# 6. Run it as a stream with dropDuplicates, but no extra watermark
#
sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15
minutes")
sdf.createOrReplaceTempView("test_data")
res = spark.sql(sql)
res = (
(res.select(window("ts", "30 minutes"), "*"))
.dropDuplicates(["event_id"]).drop("window")
)
write_stream(res)
# -------------------------------------------
# Batch: 1
# -------------------------------------------
# +--------+-------+--------------------+
# |event_id|join_id| ts|
# +--------+-------+--------------------+
# | one_800| 800|2024-04-30 12:01:...|
# | one_18| 18|2024-04-27 18:51:...|
# | one_3| 3|2024-04-27 17:36:...|
# +--------+-------+--------------------+
#
# 7. Run it as a stream with dropDuplicates, using a watermark
# THIS is where we see the error
#
sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15
minutes")
sdf.createOrReplaceTempView("test_data")
res = spark.sql(sql)
res = (
(res.select(window("ts", "30 minutes"), "*"))
.withWatermark("window", '30 minutes')
.dropDuplicates(["event_id"]).drop("window")
)
write_stream(res)
print("END")
# -------------------------------------------
# Batch: 0
# -------------------------------------------
# +--------+-------+---+------------+
# |event_id|join_id| ts|detection_ts|
# +--------+-------+---+------------+
# +--------+-------+---+------------+
# END{code}
was:
When applying a streaming dropDuplicates-with-watermark to a self-referential
stream-stream left-join with watermark, the dropDuplicates drops all rows. If
the watermark is eliminated from the dropDuplicates, the query behaves as
expected.
The code below demonstrates the error:
{code:java}
#
# 1. Generate the test data
#
size = 1000
step_secs = 300
event_offset_secs = 240
# Add some lines to not left join
skips = set()
skips.add(3)
skips.add(18)
skips.add(800)
def lit_ts(secs):
return datetime.datetime.fromtimestamp(secs)
data = []
base_time = time.time()
for x in range(size):
ts = base_time + (step_secs * x)
data.append({"event_id": f"one_{x}", "join_id": x, "ts": lit_ts(ts)})
if x not in skips:
data.append(
{
"event_id": f"two_{x}",
"join_id": x,
"ts": lit_ts(ts - event_offset_secs),
}
)
# Add duplicates to validate the dropDuplicates
for i in range(len(data)):
data.append(data[i].copy())
#
# 2. Write the results so we can stream
#
path = "/tmp/bugtest"
df = spark.createDataFrame(data, schema="event_id string, join_id string, ts
TIMESTAMP")
df.repartition(1).write.format("delta").mode("overwrite").save(path)
df = spark.read.format("delta").load(path)
df.createOrReplaceTempView("test_data")
#
# 3. Define the test query
#
sql = """
with also_test_data as (
select * from test_data
where event_id like 'two%'
)
select td.* from test_data
td left join also_test_data on (
td.join_id = also_test_data.join_id
and also_test_data.ts >= td.ts - INTERVAL 15 MINUTES
and also_test_data.ts <= td.ts
)
where td.event_id like 'one%'
and also_test_data.event_id is NULL
"""
#
# 4. Run it non-streaming w/non-deDuplicated to validate results
#
res = spark.sql(sql)
print("Static query")
res.show(truncate=False)
# Static query
# +--------+-------+-------------------------+
# |event_id|join_id|ts |
# +--------+-------+-------------------------+
# |one_3 |3 |2024-04-27 17:36:54.97927|
# |one_18 |18 |2024-04-27 18:51:54.97927|
# |one_800 |800 |2024-04-30 12:01:54.97927|
# |one_3 |3 |2024-04-27 17:36:54.97927|
# |one_18 |18 |2024-04-27 18:51:54.97927|
# |one_800 |800 |2024-04-30 12:01:54.97927|
# +--------+-------+-------------------------+
#
# 5. Run it as a stream with no-dropDuplicates
#
def write_stream(res):
(
res.writeStream.outputMode("append")
.trigger(availableNow=True)
.format("console")
.start()
.awaitTermination()
)
sdf = spark.readStream.format("delta").load(path).withWatermark('ts', '15
minutes')
sdf.createOrReplaceTempView("test_data")
res = spark.sql(sql)
write_stream(res)
# Batch: 1
# -------------------------------------------
# +--------+-------+--------------------+
# |event_id|join_id| ts|
# +--------+-------+--------------------+
# | one_800| 800|2024-04-30 12:01:...|
# | one_800| 800|2024-04-30 12:01:...|
# | one_3| 3|2024-04-27 17:36:...|
# | one_3| 3|2024-04-27 17:36:...|
# | one_18| 18|2024-04-27 18:51:...|
# | one_18| 18|2024-04-27 18:51:...|
# +--------+-------+--------------------+
#
# 6. Run it as a stream with dropDuplicates, but no extra watermark
#
sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15
minutes")
sdf.createOrReplaceTempView("test_data")
res = spark.sql(sql)
res = (
(res.select(window("ts", "30 minutes"), "*"))
.dropDuplicates(["event_id"]).drop("window")
)
write_stream(res)
# -------------------------------------------
# Batch: 1
# -------------------------------------------
# +--------+-------+--------------------+
# |event_id|join_id| ts|
# +--------+-------+--------------------+
# | one_800| 800|2024-04-30 12:01:...|
# | one_18| 18|2024-04-27 18:51:...|
# | one_3| 3|2024-04-27 17:36:...|
# +--------+-------+--------------------+
#
# 7. Run it as a stream with dropDuplicates, using a watermark
# THIS is where we see the error
#
sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15
minutes")
sdf.createOrReplaceTempView("test_data")
res = spark.sql(sql)
res = (
(res.select(window("ts", "30 minutes"), "*"))
.withWatermark("window", '30 minutes')
.dropDuplicates(["event_id"]).drop("window")
)
write_stream(res)
print("END")
# -------------------------------------------
# Batch: 0
# -------------------------------------------
# +--------+-------+---+------------+
# |event_id|join_id| ts|detection_ts|
# +--------+-------+---+------------+
# +--------+-------+---+------------+
# END{code}
> Stream-stream join with windowed+watermarked drop-duplicates suppresses all
> rows
> --------------------------------------------------------------------------------
>
> Key: SPARK-48023
> URL: https://issues.apache.org/jira/browse/SPARK-48023
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.5.0
> Reporter: Scott Schenkein
> Priority: Major
>
> When applying a streaming dropDuplicates-with-watermark to a
> self-referential stream-stream left-join with watermark, the dropDuplicates
> drops all rows. If the watermark is eliminated from the dropDuplicates, the
> query behaves as expected.
>
> The code below demonstrates the error:
>
> {code:java}
> #
> # 1. Generate the test data
> #
> size = 1000
> step_secs = 300
> event_offset_secs = 240
> # Add some lines to not left join
> skips = set()
> skips.add(3)
> skips.add(18)
> skips.add(800)
> def lit_ts(secs):
> return datetime.datetime.fromtimestamp(secs)
> data = []
> base_time = time.time()
> for x in range(size):
> ts = base_time + (step_secs * x)
> data.append({"event_id": f"one_{x}", "join_id": x, "ts": lit_ts(ts)})
> if x not in skips:
> data.append(
> {
> "event_id": f"two_{x}",
> "join_id": x,
> "ts": lit_ts(ts - event_offset_secs),
> }
> )
> # Add duplicates to validate the dropDuplicates
> for i in range(len(data)):
> data.append(data[i].copy())
> #
> # 2. Write the results so we can stream
> #
> path = "/tmp/bugtest"
> df = spark.createDataFrame(data, schema="event_id string, join_id string, ts
> TIMESTAMP")
> df.repartition(1).write.format("delta").mode("overwrite").save(path)
> df = spark.read.format("delta").load(path)
> df.createOrReplaceTempView("test_data")
> #
> # 3. Define the test query
> #
> sql = """
> with also_test_data as (
> select * from test_data
> where event_id like 'two%'
> )
> select td.* from test_data
> td left join also_test_data on (
> td.join_id = also_test_data.join_id
> and also_test_data.ts >= td.ts - INTERVAL 15 MINUTES
> and also_test_data.ts <= td.ts
> )
> where td.event_id like 'one%'
> -- rows where left-join does not match
> and also_test_data.event_id is NULL
> """
> #
> # 4. Run it non-streaming w/non-deDuplicated to validate results
> #
> res = spark.sql(sql)
> print("Static query")
> res.show(truncate=False)
> # Static query
> # +--------+-------+-------------------------+
> # |event_id|join_id|ts |
> # +--------+-------+-------------------------+
> # |one_3 |3 |2024-04-27 17:36:54.97927|
> # |one_18 |18 |2024-04-27 18:51:54.97927|
> # |one_800 |800 |2024-04-30 12:01:54.97927|
> # |one_3 |3 |2024-04-27 17:36:54.97927|
> # |one_18 |18 |2024-04-27 18:51:54.97927|
> # |one_800 |800 |2024-04-30 12:01:54.97927|
> # +--------+-------+-------------------------+
> #
> # 5. Run it as a stream with no-dropDuplicates
> #
> def write_stream(res):
> (
> res.writeStream.outputMode("append")
> .trigger(availableNow=True)
> .format("console")
> .start()
> .awaitTermination()
> )
> sdf = spark.readStream.format("delta").load(path).withWatermark('ts', '15
> minutes')
> sdf.createOrReplaceTempView("test_data")
> res = spark.sql(sql)
> write_stream(res)
> # Batch: 1
> # -------------------------------------------
> # +--------+-------+--------------------+
> # |event_id|join_id| ts|
> # +--------+-------+--------------------+
> # | one_800| 800|2024-04-30 12:01:...|
> # | one_800| 800|2024-04-30 12:01:...|
> # | one_3| 3|2024-04-27 17:36:...|
> # | one_3| 3|2024-04-27 17:36:...|
> # | one_18| 18|2024-04-27 18:51:...|
> # | one_18| 18|2024-04-27 18:51:...|
> # +--------+-------+--------------------+
> #
> # 6. Run it as a stream with dropDuplicates, but no extra watermark
> #
> sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15
> minutes")
> sdf.createOrReplaceTempView("test_data")
> res = spark.sql(sql)
> res = (
> (res.select(window("ts", "30 minutes"), "*"))
> .dropDuplicates(["event_id"]).drop("window")
> )
> write_stream(res)
> # -------------------------------------------
> # Batch: 1
> # -------------------------------------------
> # +--------+-------+--------------------+
> # |event_id|join_id| ts|
> # +--------+-------+--------------------+
> # | one_800| 800|2024-04-30 12:01:...|
> # | one_18| 18|2024-04-27 18:51:...|
> # | one_3| 3|2024-04-27 17:36:...|
> # +--------+-------+--------------------+
> #
> # 7. Run it as a stream with dropDuplicates, using a watermark
> # THIS is where we see the error
> #
> sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15
> minutes")
> sdf.createOrReplaceTempView("test_data")
> res = spark.sql(sql)
> res = (
> (res.select(window("ts", "30 minutes"), "*"))
> .withWatermark("window", '30 minutes')
> .dropDuplicates(["event_id"]).drop("window")
> )
> write_stream(res)
> print("END")
> # -------------------------------------------
>
> # Batch: 0
> # -------------------------------------------
> # +--------+-------+---+------------+
> # |event_id|join_id| ts|detection_ts|
> # +--------+-------+---+------------+
> # +--------+-------+---+------------+
> # END{code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]