[ 
https://issues.apache.org/jira/browse/SPARK-48992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Swinbank updated SPARK-48992:
-------------------------------------
    Description: 
When I use GroupedData.applyInPandas to implement aggregation in a streaming 
query, it fails to respect a watermark specified using DataFrame.withWatermark.

This query reproduces the behaviour I'm seeing:
 
{code:python}
from pyspark.sql.functions import window
from typing import Tuple
import pandas as pd

df_source_stream = (
    spark.readStream
    .format("rate")
    .option("rowsPerSecond", 3)
    .load()
    .withColumn("bucket", window("timestamp", "10 seconds").end)
)

def my_function(
    key: Tuple[str], df: pd.DataFrame
) -> pd.DataFrame:
    return pd.DataFrame({"bucket": [key[0]], "count": [df.shape[0]]})

df = (
    df_source_stream
    .withWatermark("bucket", "10 seconds")
    .groupBy("bucket")
    .applyInPandas(my_function, "bucket TIMESTAMP, count INT")
)
display(df)
{code}
I expect the output of the query to contain one row per {{bucket}} value, but a 
new row is emitted for each incoming microbatch.

In contrast, an out of the box aggregate behaves as expected. For example:
{code:python}
df = (
    df_source_stream
    .withWatermark("bucket", "10 seconds")
    .groupBy("bucket")
    .count()  # standard aggregate in place of applyInPandas
)
{code}
The output of this query contains *one* row per {{bucket}} value.
 

  was:
When I use GroupedData.applyInPandas to implement aggregation in a streaming 
query, it fails to respect a watermark specified using DataFrame.withWatermark.

This query reproduces the behvaiour I'm seeing:
 
{code:python}
from pyspark.sql.functions import window
from typing import Tuple
import pandas as pd

df_source_stream = (
    spark.readStream
    .format("rate")
    .option("rowsPerSecond", 3)
    .load()
    .withColumn("bucket", window("timestamp", "10 seconds").end)
)

def my_function(
    key: Tuple[str], df: pd.DataFrame
) -> pd.DataFrame:
    return pd.DataFrame({"bucket": [key[0]], "count": [df.shape[0]]})

df = (
    df_source_stream
    .withWatermark("bucket", "10 seconds")
    .groupBy("bucket")
    .applyInPandas(my_function, "bucket TIMESTAMP, count INT")
)
display(df)
{code}
I expect the output of the query to contain one row per {{bucket}} value, but a 
new row is emitted for each incoming microbatch.

In contrast, an out of the box aggregate behaves as expected. For example:
{code:python}
df = (
    df_source_stream
    .withWatermark("bucket", "10 seconds")
    .groupBy("bucket")
    .count()  # standard aggregate in place of applyInPandas
)
{code}
The output of this query contains *one* row per {{bucket}} value.
 


> applyInPandas does not respect streaming watermark
> --------------------------------------------------
>
>                 Key: SPARK-48992
>                 URL: https://issues.apache.org/jira/browse/SPARK-48992
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.5.0
>         Environment: Azure Databricks runtime 14.3 LTS
>            Reporter: Richard Swinbank
>            Priority: Minor
>
> When I use GroupedData.applyInPandas to implement aggregation in a streaming 
> query, it fails to respect a watermark specified using 
> DataFrame.withWatermark.
> This query reproduces the behaviour I'm seeing:
>  
> {code:python}
> from pyspark.sql.functions import window
> from typing import Tuple
> import pandas as pd
> df_source_stream = (
>     spark.readStream
>     .format("rate")
>     .option("rowsPerSecond", 3)
>     .load()
>     .withColumn("bucket", window("timestamp", "10 seconds").end)
> )
> def my_function(
>     key: Tuple[str], df: pd.DataFrame
> ) -> pd.DataFrame:
>     return pd.DataFrame({"bucket": [key[0]], "count": [df.shape[0]]})
> df = (
>     df_source_stream
>     .withWatermark("bucket", "10 seconds")
>     .groupBy("bucket")
>     .applyInPandas(my_function, "bucket TIMESTAMP, count INT")
> )
> display(df)
> {code}
> I expect the output of the query to contain one row per {{bucket}} value, but 
> a new row is emitted for each incoming microbatch.
> In contrast, an out of the box aggregate behaves as expected. For example:
> {code:python}
> df = (
>     df_source_stream
>     .withWatermark("bucket", "10 seconds")
>     .groupBy("bucket")
>     .count()  # standard aggregate in place of applyInPandas
> )
> {code}
> The output of this query contains *one* row per {{bucket}} value.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to