Chloe He created SPARK-47718:
--------------------------------
Summary: .sql() does not recognize watermark defined upstream
Key: SPARK-47718
URL: https://issues.apache.org/jira/browse/SPARK-47718
Project: Spark
Issue Type: Bug
Components: PySpark
Affects Versions: 3.5.1
Reporter: Chloe He
I have a data pipeline set up in such a way that it reads data from a Kafka
source, does some transformation on the data using pyspark, then writes the
output into a sink (Kafka, Redis, etc).
My entire pipeline in written in SQL, so I wish to use the .sql() method to
execute SQL on my streaming source directly.
However, I'm running into the issue where my watermark is not being recognized
by the downstream query via the .sql() method.
```
Python 3.11.8 | packaged by conda-forge | (main, Feb 16 2024, 20:49:36) [Clang
16.0.6 ] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import pyspark
>>> print(pyspark.__version__)
3.5.1
>>> from pyspark.sql import SparkSession
>>>
>>> session = SparkSession.builder \
... .config("spark.jars.packages",
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1")\
... .getOrCreate()
>>> from pyspark.sql.functions import col, from_json
>>> from pyspark.sql.types import StructField, StructType, TimestampType,
>>> LongType, DoubleType, IntegerType
>>> schema = StructType(
... [
... StructField('createTime', TimestampType(), True),
... StructField('orderId', LongType(), True),
... StructField('payAmount', DoubleType(), True),
... StructField('payPlatform', IntegerType(), True),
... StructField('provinceId', IntegerType(), True),
... ])
>>>
>>> streaming_df = session.readStream\
... .format("kafka")\
... .option("kafka.bootstrap.servers", "localhost:9092")\
... .option("subscribe", "payment_msg")\
... .option("startingOffsets","earliest")\
... .load()\
... .select(from_json(col("value").cast("string"),
schema).alias("parsed_value"))\
... .select("parsed_value.*")\
... .withWatermark("createTime", "10 seconds")
>>>
>>> streaming_df.createOrReplaceTempView("streaming_df")
>>> session.sql("""
... SELECT
... window.start, window.end, provinceId, sum(payAmount) as totalPayAmount
... FROM streaming_df
... GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
... ORDER BY window.start
... """)\
... .writeStream\
... .format("kafka") \
... .option("checkpointLocation", "checkpoint") \
... .option("kafka.bootstrap.servers", "localhost:9092") \
... .option("topic", "sink") \
... .start()
```
This throws exception
```
pyspark.errors.exceptions.captured.AnalysisException: Append output mode not
supported when there are streaming aggregations on streaming
DataFrames/DataSets without watermark; line 6 pos 4;
```
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]