Richard Swinbank created SPARK-48992:
----------------------------------------

             Summary: applyInPandas does not respect streaming watermark
                 Key: SPARK-48992
                 URL: https://issues.apache.org/jira/browse/SPARK-48992
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 3.5.0
         Environment: Azure Databricks runtime 14.3 LTS
            Reporter: Richard Swinbank


When I use GroupedData.applyInPandas to implement aggregation in a streaming 
query, it fails to respect a watermark specified using DataFrame.withWatermark.

This query reproduces the behvaiour I'm seeing:
 
{code:python}
from pyspark.sql.functions import window
from typing import Tuple
import pandas as pd

df_source_stream = (
    spark.readStream
    .format("rate")
    .option("rowsPerSecond", 3)
    .load()
    .withColumn("bucket", window("timestamp", "10 seconds").end)
)

def my_function(
    key: Tuple[str], df: pd.DataFrame
) -> pd.DataFrame:
    return pd.DataFrame({"bucket": [key[0]], "count": [df.shape[0]]})

df = (
    df_source_stream
    .withWatermark("bucket", "10 seconds")
    .groupBy("bucket")
    .applyInPandas(my_function, "bucket TIMESTAMP, count INT")
)
display(df)
{code}
I expect the output of the query to contain one row per {{bucket}} value, but a 
new row is emitted for each incoming microbatch.

In contrast, an out of the box aggregate behaves as expected. For example:
{code:python}
df = (
    df_source_stream
    .withWatermark("bucket", "10 seconds")
    .groupBy("bucket")
    .count()  # standard aggregate in place of applyInPandas
)
{code}
The output of this query contains *one* row per {{bucket}} value.
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to