[
https://issues.apache.org/jira/browse/SPARK-48992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Richard Swinbank updated SPARK-48992:
-------------------------------------
Description:
When I use GroupedData.applyInPandas to implement aggregation in a streaming
query, it fails to respect a watermark specified using DataFrame.withWatermark.
This query reproduces the behaviour I'm seeing:
{code:python}
from pyspark.sql.functions import window
from typing import Tuple
import pandas as pd
df_source_stream = (
spark.readStream
.format("rate")
.option("rowsPerSecond", 3)
.load()
.withColumn("bucket", window("timestamp", "10 seconds").end)
)
def my_function(
key: Tuple[str], df: pd.DataFrame
) -> pd.DataFrame:
return pd.DataFrame({"bucket": [key[0]], "count": [df.shape[0]]})
df = (
df_source_stream
.withWatermark("bucket", "10 seconds")
.groupBy("bucket")
.applyInPandas(my_function, "bucket TIMESTAMP, count INT")
)
display(df)
{code}
I expect the output of the query to contain one row per {{bucket}} value, but a
new row is emitted for each incoming microbatch.
In contrast, an out of the box aggregate behaves as expected. For example:
{code:python}
df = (
df_source_stream
.withWatermark("bucket", "10 seconds")
.groupBy("bucket")
.count() # standard aggregate in place of applyInPandas
)
{code}
The output of this query contains *one* row per {{bucket}} value.
was:
When I use GroupedData.applyInPandas to implement aggregation in a streaming
query, it fails to respect a watermark specified using DataFrame.withWatermark.
This query reproduces the behvaiour I'm seeing:
{code:python}
from pyspark.sql.functions import window
from typing import Tuple
import pandas as pd
df_source_stream = (
spark.readStream
.format("rate")
.option("rowsPerSecond", 3)
.load()
.withColumn("bucket", window("timestamp", "10 seconds").end)
)
def my_function(
key: Tuple[str], df: pd.DataFrame
) -> pd.DataFrame:
return pd.DataFrame({"bucket": [key[0]], "count": [df.shape[0]]})
df = (
df_source_stream
.withWatermark("bucket", "10 seconds")
.groupBy("bucket")
.applyInPandas(my_function, "bucket TIMESTAMP, count INT")
)
display(df)
{code}
I expect the output of the query to contain one row per {{bucket}} value, but a
new row is emitted for each incoming microbatch.
In contrast, an out of the box aggregate behaves as expected. For example:
{code:python}
df = (
df_source_stream
.withWatermark("bucket", "10 seconds")
.groupBy("bucket")
.count() # standard aggregate in place of applyInPandas
)
{code}
The output of this query contains *one* row per {{bucket}} value.
> applyInPandas does not respect streaming watermark
> --------------------------------------------------
>
> Key: SPARK-48992
> URL: https://issues.apache.org/jira/browse/SPARK-48992
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.5.0
> Environment: Azure Databricks runtime 14.3 LTS
> Reporter: Richard Swinbank
> Priority: Minor
>
> When I use GroupedData.applyInPandas to implement aggregation in a streaming
> query, it fails to respect a watermark specified using
> DataFrame.withWatermark.
> This query reproduces the behaviour I'm seeing:
>
> {code:python}
> from pyspark.sql.functions import window
> from typing import Tuple
> import pandas as pd
> df_source_stream = (
> spark.readStream
> .format("rate")
> .option("rowsPerSecond", 3)
> .load()
> .withColumn("bucket", window("timestamp", "10 seconds").end)
> )
> def my_function(
> key: Tuple[str], df: pd.DataFrame
> ) -> pd.DataFrame:
> return pd.DataFrame({"bucket": [key[0]], "count": [df.shape[0]]})
> df = (
> df_source_stream
> .withWatermark("bucket", "10 seconds")
> .groupBy("bucket")
> .applyInPandas(my_function, "bucket TIMESTAMP, count INT")
> )
> display(df)
> {code}
> I expect the output of the query to contain one row per {{bucket}} value, but
> a new row is emitted for each incoming microbatch.
> In contrast, an out of the box aggregate behaves as expected. For example:
> {code:python}
> df = (
> df_source_stream
> .withWatermark("bucket", "10 seconds")
> .groupBy("bucket")
> .count() # standard aggregate in place of applyInPandas
> )
> {code}
> The output of this query contains *one* row per {{bucket}} value.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]