[
https://issues.apache.org/jira/browse/SPARK-50642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hyukjin Kwon resolved SPARK-50642.
----------------------------------
Resolution: Fixed
Issue resolved by pull request 49260
[https://github.com/apache/spark/pull/49260]
> Fix the state schema for FlatMapGroupsWithState in spark connect when there
> is no initial state
> -----------------------------------------------------------------------------------------------
>
> Key: SPARK-50642
> URL: https://issues.apache.org/jira/browse/SPARK-50642
> Project: Spark
> Issue Type: Bug
> Components: Connect, Structured Streaming
> Affects Versions: 3.5.0
> Reporter: Huanli Wang
> Assignee: Huanli Wang
> Priority: Major
> Labels: pull-request-available
> Fix For: 4.0.0
>
>
> In spark connect, when there is no initial state, we derived the state schema
> from the input:
> * create the initialDs from the original input:
> [https://github.com/apache/spark/blob/master/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L679-L689]
> * derived the state expression encoder from this `initialDs` which is
> incorrect:
> [https://github.com/apache/spark/blob/master/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L729]
>
> Our unit tests fail to cover this case because it doesn't do the state
> update:
> [https://github.com/apache/spark/blob/master/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateStreamingSuite.scala#L55-L59]
> after changing the `stateFunc` to the following
> {code:java}
> val stateFunc =
> (key: String, values: Iterator[ClickEvent], state: GroupState[ClickState])
> => {
> if (state.exists) throw new IllegalArgumentException("state.exists should
> be false")
> val newState = ClickState(key, values.size)
> state.update(newState)
> Iterator(newState)
> } {code}
> the test is actually failing with
> {code:java}
> Cause: org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 122 in stage 2.0 failed 1 times, most recent failure: Lost task 122.0 in
> stage 2.0 (TID 12) (192.168.68.84 executor driver):
> java.lang.ClassCastException: class org.apache.spark.sql.streaming.ClickState
> cannot be cast to class org.apache.spark.sql.streaming.ClickEvent
> (org.apache.spark.sql.streaming.ClickState and
> org.apache.spark.sql.streaming.ClickEvent are in unnamed module of loader
> 'app')
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.CreateNamedStruct_0$(Unknown
> Source)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]