[ 
https://issues.apache.org/jira/browse/SPARK-50642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-50642.
----------------------------------
    Resolution: Fixed

Issue resolved by pull request 49260
[https://github.com/apache/spark/pull/49260]

> Fix the state schema for FlatMapGroupsWithState in spark connect when there 
> is no initial state
> -----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-50642
>                 URL: https://issues.apache.org/jira/browse/SPARK-50642
>             Project: Spark
>          Issue Type: Bug
>          Components: Connect, Structured Streaming
>    Affects Versions: 3.5.0
>            Reporter: Huanli Wang
>            Assignee: Huanli Wang
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 4.0.0
>
>
> In spark connect, when there is no initial state, we derived the state schema 
> from the input:
>  * create the initialDs from the original input: 
> [https://github.com/apache/spark/blob/master/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L679-L689]
>  * derived the state expression encoder from this `initialDs` which is 
> incorrect: 
> [https://github.com/apache/spark/blob/master/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L729]
>  
> Our unit tests fail to cover this case because it doesn't do the state 
> update: 
> [https://github.com/apache/spark/blob/master/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateStreamingSuite.scala#L55-L59]
> after changing the `stateFunc` to the following
> {code:java}
> val stateFunc =
>   (key: String, values: Iterator[ClickEvent], state: GroupState[ClickState]) 
> => {
>     if (state.exists) throw new IllegalArgumentException("state.exists should 
> be false")
>     val newState = ClickState(key, values.size)
>     state.update(newState)
>     Iterator(newState)
>   } {code}
> the test is actually failing with
> {code:java}
> Cause: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 122 in stage 2.0 failed 1 times, most recent failure: Lost task 122.0 in 
> stage 2.0 (TID 12) (192.168.68.84 executor driver): 
> java.lang.ClassCastException: class org.apache.spark.sql.streaming.ClickState 
> cannot be cast to class org.apache.spark.sql.streaming.ClickEvent 
> (org.apache.spark.sql.streaming.ClickState and 
> org.apache.spark.sql.streaming.ClickEvent are in unnamed module of loader 
> 'app')
>         at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.CreateNamedStruct_0$(Unknown
>  Source)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to