[
https://issues.apache.org/jira/browse/SPARK-19903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Michael Armbrust updated SPARK-19903:
-------------------------------------
Description:
PySpark example reads a Kafka stream. There is watermarking set when handling
the data window. The defined query uses output Append mode.
The PySpark engine reports the error:
'Append output mode not supported when there are streaming aggregations on
streaming DataFrames/DataSets'
The Python example:
-------------------------------------------------------------------------------
{code}
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, window
if __name__ == "__main__":
if len(sys.argv) != 4:
print("""
Usage: structured_kafka_wordcount.py <bootstrap-servers>
<subscribe-type> <topics>
""", file=sys.stderr)
exit(-1)
bootstrapServers = sys.argv[1]
subscribeType = sys.argv[2]
topics = sys.argv[3]
spark = SparkSession\
.builder\
.appName("StructuredKafkaWordCount")\
.getOrCreate()
# Create DataSet representing the stream of input lines from kafka
lines = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", bootstrapServers)\
.option(subscribeType, topics)\
.load()\
.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array
into multiple rows
words = lines.select(
explode(split(lines.value, ' ')).alias('word'),
lines.timestamp
)
# Group the data by window and word and compute the count of each group
windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
window(words.timestamp, "30 seconds", "30 seconds"), words.word
).count()
# Start running the query that prints the running counts to the console
query = windowedCounts\
.writeStream\
.outputMode('append')\
.format('console')\
.option("truncate", "false")\
.start()
query.awaitTermination()
{code}
The corresponding example in Zeppelin notebook:
{code}
%spark.pyspark
from pyspark.sql.functions import explode, split, window
# Create DataSet representing the stream of input lines from kafka
lines = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe", "words")\
.load()\
.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array into
multiple rows
words = lines.select(
explode(split(lines.value, ' ')).alias('word'),
lines.timestamp
)
# Group the data by window and word and compute the count of each group
windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
window(words.timestamp, "30 seconds", "30 seconds"), words.word
).count()
# Start running the query that prints the running counts to the console
query = windowedCounts\
.writeStream\
.outputMode('append')\
.format('console')\
.option("truncate", "false")\
.start()
query.awaitTermination()
--------------------------------------------------------------------------------------
Note that the Scala version of the same example in Zeppelin notebook works fine:
----------------------------------------------------------------------------------------
import java.sql.Timestamp
import org.apache.spark.sql.streaming.ProcessingTime
import org.apache.spark.sql.functions._
// Create DataSet representing the stream of input lines from kafka
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "words")
.load()
// Split the lines into words, retaining timestamps
val words = lines
.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
.as[(String, Timestamp)]
.flatMap(line => line._1.split(" ").map(word => (word, line._2)))
.toDF("word", "timestamp")
// Group the data by window and word and compute the count of each group
val windowedCounts = words
.withWatermark("timestamp", "30 seconds")
.groupBy(window($"timestamp", "30 seconds", "30 seconds"), $"word")
.count()
// Start running the query that prints the windowed word counts to the console
val query = windowedCounts.writeStream
.outputMode("append")
.format("console")
.trigger(ProcessingTime("35 seconds"))
.option("truncate", "false")
.start()
query.awaitTermination()
{code}
was:
PySpark example reads a Kafka stream. There is watermarking set when handling
the data window. The defined query uses output Append mode.
The PySpark engine reports the error:
'Append output mode not supported when there are streaming aggregations on
streaming DataFrames/DataSets'
The Python example:
-------------------------------------------------------------------------------
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, window
if __name__ == "__main__":
if len(sys.argv) != 4:
print("""
Usage: structured_kafka_wordcount.py <bootstrap-servers>
<subscribe-type> <topics>
""", file=sys.stderr)
exit(-1)
bootstrapServers = sys.argv[1]
subscribeType = sys.argv[2]
topics = sys.argv[3]
spark = SparkSession\
.builder\
.appName("StructuredKafkaWordCount")\
.getOrCreate()
# Create DataSet representing the stream of input lines from kafka
lines = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", bootstrapServers)\
.option(subscribeType, topics)\
.load()\
.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array
into multiple rows
words = lines.select(
explode(split(lines.value, ' ')).alias('word'),
lines.timestamp
)
# Group the data by window and word and compute the count of each group
windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
window(words.timestamp, "30 seconds", "30 seconds"), words.word
).count()
# Start running the query that prints the running counts to the console
query = windowedCounts\
.writeStream\
.outputMode('append')\
.format('console')\
.option("truncate", "false")\
.start()
query.awaitTermination()
---------------------------------------------------------------------
The corresponding example in Zeppelin notebook:
---------------------------------------------------------------
%spark.pyspark
from pyspark.sql.functions import explode, split, window
# Create DataSet representing the stream of input lines from kafka
lines = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe", "words")\
.load()\
.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array into
multiple rows
words = lines.select(
explode(split(lines.value, ' ')).alias('word'),
lines.timestamp
)
# Group the data by window and word and compute the count of each group
windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
window(words.timestamp, "30 seconds", "30 seconds"), words.word
).count()
# Start running the query that prints the running counts to the console
query = windowedCounts\
.writeStream\
.outputMode('append')\
.format('console')\
.option("truncate", "false")\
.start()
query.awaitTermination()
--------------------------------------------------------------------------------------
Note that the Scala version of the same example in Zeppelin notebook works fine:
----------------------------------------------------------------------------------------
import java.sql.Timestamp
import org.apache.spark.sql.streaming.ProcessingTime
import org.apache.spark.sql.functions._
// Create DataSet representing the stream of input lines from kafka
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "words")
.load()
// Split the lines into words, retaining timestamps
val words = lines
.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
.as[(String, Timestamp)]
.flatMap(line => line._1.split(" ").map(word => (word, line._2)))
.toDF("word", "timestamp")
// Group the data by window and word and compute the count of each group
val windowedCounts = words
.withWatermark("timestamp", "30 seconds")
.groupBy(window($"timestamp", "30 seconds", "30 seconds"), $"word")
.count()
// Start running the query that prints the windowed word counts to the console
val query = windowedCounts.writeStream
.outputMode("append")
.format("console")
.trigger(ProcessingTime("35 seconds"))
.option("truncate", "false")
.start()
query.awaitTermination()
-----------------------------------------------------------------------------------------
> PySpark Kafka streaming query ouput append mode not possible
> ------------------------------------------------------------
>
> Key: SPARK-19903
> URL: https://issues.apache.org/jira/browse/SPARK-19903
> Project: Spark
> Issue Type: Bug
> Components: PySpark, Structured Streaming
> Affects Versions: 2.1.0
> Environment: Ubuntu Linux
> Reporter: Piotr Nestorow
>
> PySpark example reads a Kafka stream. There is watermarking set when handling
> the data window. The defined query uses output Append mode.
> The PySpark engine reports the error:
> 'Append output mode not supported when there are streaming aggregations on
> streaming DataFrames/DataSets'
> The Python example:
> -------------------------------------------------------------------------------
> {code}
> import sys
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import explode, split, window
> if __name__ == "__main__":
> if len(sys.argv) != 4:
> print("""
> Usage: structured_kafka_wordcount.py <bootstrap-servers>
> <subscribe-type> <topics>
> """, file=sys.stderr)
> exit(-1)
> bootstrapServers = sys.argv[1]
> subscribeType = sys.argv[2]
> topics = sys.argv[3]
> spark = SparkSession\
> .builder\
> .appName("StructuredKafkaWordCount")\
> .getOrCreate()
> # Create DataSet representing the stream of input lines from kafka
> lines = spark\
> .readStream\
> .format("kafka")\
> .option("kafka.bootstrap.servers", bootstrapServers)\
> .option(subscribeType, topics)\
> .load()\
> .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
> # Split the lines into words, retaining timestamps
> # split() splits each line into an array, and explode() turns the array
> into multiple rows
> words = lines.select(
> explode(split(lines.value, ' ')).alias('word'),
> lines.timestamp
> )
> # Group the data by window and word and compute the count of each group
> windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
> window(words.timestamp, "30 seconds", "30 seconds"), words.word
> ).count()
> # Start running the query that prints the running counts to the console
> query = windowedCounts\
> .writeStream\
> .outputMode('append')\
> .format('console')\
> .option("truncate", "false")\
> .start()
> query.awaitTermination()
> {code}
> The corresponding example in Zeppelin notebook:
> {code}
> %spark.pyspark
> from pyspark.sql.functions import explode, split, window
> # Create DataSet representing the stream of input lines from kafka
> lines = spark\
> .readStream\
> .format("kafka")\
> .option("kafka.bootstrap.servers", "localhost:9092")\
> .option("subscribe", "words")\
> .load()\
> .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
> # Split the lines into words, retaining timestamps
> # split() splits each line into an array, and explode() turns the array into
> multiple rows
> words = lines.select(
> explode(split(lines.value, ' ')).alias('word'),
> lines.timestamp
> )
> # Group the data by window and word and compute the count of each group
> windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
> window(words.timestamp, "30 seconds", "30 seconds"), words.word
> ).count()
> # Start running the query that prints the running counts to the console
> query = windowedCounts\
> .writeStream\
> .outputMode('append')\
> .format('console')\
> .option("truncate", "false")\
> .start()
> query.awaitTermination()
> --------------------------------------------------------------------------------------
> Note that the Scala version of the same example in Zeppelin notebook works
> fine:
> ----------------------------------------------------------------------------------------
> import java.sql.Timestamp
> import org.apache.spark.sql.streaming.ProcessingTime
> import org.apache.spark.sql.functions._
> // Create DataSet representing the stream of input lines from kafka
> val lines = spark
> .readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", "localhost:9092")
> .option("subscribe", "words")
> .load()
> // Split the lines into words, retaining timestamps
> val words = lines
> .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS
> TIMESTAMP)")
> .as[(String, Timestamp)]
> .flatMap(line => line._1.split(" ").map(word => (word, line._2)))
> .toDF("word", "timestamp")
> // Group the data by window and word and compute the count of each group
> val windowedCounts = words
> .withWatermark("timestamp", "30 seconds")
> .groupBy(window($"timestamp", "30 seconds", "30 seconds"),
> $"word")
> .count()
> // Start running the query that prints the windowed word counts to the console
> val query = windowedCounts.writeStream
> .outputMode("append")
> .format("console")
> .trigger(ProcessingTime("35 seconds"))
> .option("truncate", "false")
> .start()
> query.awaitTermination()
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]