Richard Swinbank created SPARK-48992:
----------------------------------------
Summary: applyInPandas does not respect streaming watermark
Key: SPARK-48992
URL: https://issues.apache.org/jira/browse/SPARK-48992
Project: Spark
Issue Type: Bug
Components: Spark Core
Affects Versions: 3.5.0
Environment: Azure Databricks runtime 14.3 LTS
Reporter: Richard Swinbank
When I use GroupedData.applyInPandas to implement aggregation in a streaming
query, it fails to respect a watermark specified using DataFrame.withWatermark.
This query reproduces the behvaiour I'm seeing:
{code:python}
from pyspark.sql.functions import window
from typing import Tuple
import pandas as pd
df_source_stream = (
spark.readStream
.format("rate")
.option("rowsPerSecond", 3)
.load()
.withColumn("bucket", window("timestamp", "10 seconds").end)
)
def my_function(
key: Tuple[str], df: pd.DataFrame
) -> pd.DataFrame:
return pd.DataFrame({"bucket": [key[0]], "count": [df.shape[0]]})
df = (
df_source_stream
.withWatermark("bucket", "10 seconds")
.groupBy("bucket")
.applyInPandas(my_function, "bucket TIMESTAMP, count INT")
)
display(df)
{code}
I expect the output of the query to contain one row per {{bucket}} value, but a
new row is emitted for each incoming microbatch.
In contrast, an out of the box aggregate behaves as expected. For example:
{code:python}
df = (
df_source_stream
.withWatermark("bucket", "10 seconds")
.groupBy("bucket")
.count() # standard aggregate in place of applyInPandas
)
{code}
The output of this query contains *one* row per {{bucket}} value.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]