[
https://issues.apache.org/jira/browse/SPARK-50661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hyukjin Kwon reassigned SPARK-50661:
------------------------------------
Assignee: Haiyang Sun
> Scala Streaming foreachBatch doesn't work for input type Dataset[T] due to
> missing encoder for T.
> -------------------------------------------------------------------------------------------------
>
> Key: SPARK-50661
> URL: https://issues.apache.org/jira/browse/SPARK-50661
> Project: Spark
> Issue Type: Bug
> Components: Connect, Structured Streaming
> Affects Versions: 4.0.0, 3.5.4
> Reporter: Haiyang Sun
> Assignee: Haiyang Sun
> Priority: Major
> Labels: pull-request-available
>
> Currently, spark connect implementation of Scala streaming does not take the
> encoder for T when the input is Dataset[T]. This will lead to a runtime
> exception when the function passed is `(Dataset[T], Long) => Unit`, while it
> is executed as `(Dataframe, Long) => Unit`.
>
> We can reproduce the failure with the following test:
>
> ```
> import org.apache.spark.sql._
> import org.apache.spark.sql.streaming._
> val inputPath = "/tmp/input"
> case class C(id: Int, name: String, age: Int)
> val data = Seq((1, "Alice", 29), (2, "Bob", 35), (3, "Cathy", 26), (4,
> "David", 40), (5, "Eve", 30)).toDF("id", "name", "age")
> data.write.format("csv").mode("overwrite").save(inputPath)
> val df = spark.readStream.format("csv").schema("id INT, name String, age
> INT").load("/tmp/input")
> val query = df.writeStream.foreachBatch \{ (batchDF: DataFrame, batchId:
> Long) => { print(batchDF.collectAsList()) }
> }.trigger(Trigger.ProcessingTime("1 seconds")).start()
> Thread.sleep(2000)
> query.stop()
> ```
>
> The fix should address this by passing the encoder for T together with the
> function definition (as other Scala UDFs), and then restore the encoder in
> SparkConnectPlanner.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]