[ 
https://issues.apache.org/jira/browse/SPARK-52819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Quirin Kögl updated SPARK-52819:
--------------------------------
    Attachment: Main.java

> Structured Streaming: FlatMapGroupsWithState: 
> java.io.NotSerializableException: 
> org.apache.spark.sql.catalyst.encoders.KryoSerializationCodec$
> ----------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-52819
>                 URL: https://issues.apache.org/jira/browse/SPARK-52819
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 4.0.0
>            Reporter: Quirin Kögl
>            Priority: Major
>         Attachments: Main.java
>
>
> When trying to use a kryo serializer for the State using the 
> Dataset.flatMapGroupsWithState function. 
> The following exception occurs:
> org.apache.spark.SparkException: Task not serializable
>     at 
> org.apache.spark.util.SparkClosureCleaner$.clean(SparkClosureCleaner.scala:45)
>     at org.apache.spark.SparkContext.clean(SparkContext.scala:2839)
>     at 
> org.apache.spark.sql.execution.streaming.state.package$StateStoreOps.mapPartitionsWithStateStore(package.scala:67)
>     at 
> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExecBase.doExecute(FlatMapGroupsWithStateExec.scala:263)
>     at 
> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExecBase.doExecute$(FlatMapGroupsWithStateExec.scala:207)
>     at 
> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec.doExecute(FlatMapGroupsWithStateExec.scala:403)
>     at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeRDD$1(SparkPlan.scala:188)
>     at scala.util.Try$.apply(Try.scala:217)
>     at 
> org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
>     at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46)
>     at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46)
>     at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
>     at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:201)
>     at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:260)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>     at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:257)
>     at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:197)
>     at 
> org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:533)
>     at 
> org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:461)
>     at 
> org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:460)
>     at 
> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:504)
>     at 
> org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:117)
>     at 
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:761)
>     at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeRDD$1(SparkPlan.scala:188)
>     at scala.util.Try$.apply(Try.scala:217)
>     at 
> org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1439)
>     at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
>     at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:201)
>     at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:260)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>     at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:257)
>     at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:197)
>     at 
> org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:399)
>     at 
> org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:397)
>     at 
> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2Exec.scala:329)
>     at 
> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.run(WriteToDataSourceV2Exec.scala:340)
>     at 
> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
>     at 
> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
>     at 
> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
>     at 
> org.apache.spark.sql.classic.Dataset.collectFromPlan(Dataset.scala:2244)
>     at 
> org.apache.spark.sql.classic.Dataset.$anonfun$collect$1(Dataset.scala:1482)
>     at 
> org.apache.spark.sql.classic.Dataset.$anonfun$withAction$2(Dataset.scala:2234)
>     at 
> org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
>     at 
> org.apache.spark.sql.classic.Dataset.$anonfun$withAction$1(Dataset.scala:2232)
>     at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:162)
>     at 
> org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:268)
>     at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:124)
>     at 
> org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
>     at 
> org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
>     at 
> org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
>     at 
> org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
>     at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:124)
>     at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:291)
>     at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:123)
>     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
>     at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:77)
>     at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:233)
>     at org.apache.spark.sql.classic.Dataset.withAction(Dataset.scala:2232)
>     at org.apache.spark.sql.classic.Dataset.collect(Dataset.scala:1482)
>     at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:888)
>     at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:162)
>     at 
> org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:268)
>     at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:124)
>     at 
> org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
>     at 
> org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
>     at 
> org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
>     at 
> org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
>     at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:124)
>     at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:291)
>     at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:123)
>     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
>     at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:77)
>     at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:233)
>     at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:876)
>     at 
> org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:186)
>     at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:876)
>     at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$2(MicroBatchExecution.scala:394)
>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
>     at 
> org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:186)
>     at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:364)
>     at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:344)
>     at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1$adapted(MicroBatchExecution.scala:344)
>     at 
> org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch(TriggerExecutor.scala:39)
>     at 
> org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:37)
>     at 
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.runOneBatch(TriggerExecutor.scala:70)
>     at 
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:82)
>     at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:344)
>     at 
> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:337)
>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
>     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
>     at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:311)
>     at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:226)
>     Suppressed: org.apache.spark.util.Utils$OriginalTryStackTraceException: 
> Full stacktrace of original doTryWithCallerStacktrace caller
>         at 
> org.apache.spark.util.SparkClosureCleaner$.clean(SparkClosureCleaner.scala:45)
>         at org.apache.spark.SparkContext.clean(SparkContext.scala:2839)
>         at 
> org.apache.spark.sql.execution.streaming.state.package$StateStoreOps.mapPartitionsWithStateStore(package.scala:67)
>         at 
> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExecBase.doExecute(FlatMapGroupsWithStateExec.scala:263)
>         at 
> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExecBase.doExecute$(FlatMapGroupsWithStateExec.scala:207)
>         at 
> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec.doExecute(FlatMapGroupsWithStateExec.scala:403)
>         at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeRDD$1(SparkPlan.scala:188)
>         at scala.util.Try$.apply(Try.scala:217)
>         at 
> org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
>         at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46)
>         at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46)
>         at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
>         at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:201)
>         at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:260)
>         at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:257)
>         at 
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:197)
>         at 
> org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:533)
>         at 
> org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:461)
>         at 
> org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:460)
>         at 
> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:504)
>         at 
> org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:117)
>         at 
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:761)
>         at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeRDD$1(SparkPlan.scala:188)
>         at scala.util.Try$.apply(Try.scala:217)
>         at 
> org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
>         at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46)
>         at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46)
>         ... 66 more
> Caused by: java.io.NotSerializableException: 
> org.apache.spark.sql.catalyst.encoders.KryoSerializationCodec$
> Serialization stack:
>     - object not serializable (class: 
> org.apache.spark.sql.catalyst.encoders.KryoSerializationCodec$, value: 
> <function0>)
>     - field (class: 
> org.apache.spark.sql.catalyst.encoders.AgnosticEncoders$TransformingEncoder, 
> name: codecProvider, type: interface scala.Function0)
>     - object (class 
> org.apache.spark.sql.catalyst.encoders.AgnosticEncoders$TransformingEncoder, 
> TransformingEncoder(qukoegl.Main$State,BinaryEncoder,<function0>,false))
>     - field (class: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, 
> name: encoder, type: interface 
> org.apache.spark.sql.catalyst.encoders.AgnosticEncoder)
>     - object (class org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, 
> class[value[0]: binary])
>     - field (class: 
> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec, name: 
> stateEncoder, type: class 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder)
>     - object (class 
> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec, 
> FlatMapGroupsWithState 
> org.apache.spark.sql.internal.ToScalaUDF$$$Lambda$1541/0x00000116e397f3e8@6578637,
>  decodeusingserializer(value#16, qukoegl.Main$Key, true), 
> decodeusingserializer(value#10, qukoegl.Main$Input, true), 
> decodeusingserializer(value#16, qukoegl.Main$Key, true), [value#16], 
> [value#16], [value#10], [value#10], obj#21: qukoegl.Main$Result, state info [ 
> checkpoint = 
> file:/C:/Users/koeglq/AppData/Local/Temp/temporary-3e5bc956-591a-4e55-8c49-f24a41eb99ca/state,
>  runId = c125342a-6ab9-42c8-962d-c1ccdbba9ce4, opId = 0, ver = 0, 
> numPartitions = 2] stateStoreCkptIds = None, class[value[0]: binary], 2, 
> Append, NoTimeout, 1752568574400, 0, 0, false, false
> :- *(2) Sort [value#16 ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(value#16, 2), REQUIRED_BY_STATEFUL_OPERATOR, 
> [plan_id=52]
> :     +- AppendColumnsWithObject 
> org.apache.spark.sql.internal.ToScalaUDF$$$Lambda$1539/0x00000116e397b708@59abcdc,
>  [encodeusingserializer(input[0, qukoegl.Main$Input, false], true) AS 
> value#10], [encodeusingserializer(input[0, qukoegl.Main$Key, false], true) AS 
> value#16]
> :        +- MapElements 
> qukoegl.Main$$Lambda$1414/0x00000116e391ec40@5d777d92, obj#8: 
> qukoegl.Main$Input
> :           +- DeserializeToObject 
> createexternalrow(static_invoke(DateTimeUtils.toJavaTimestamp(timestamp#0)), 
> static_invoke(java.lang.Long.valueOf(value#1L)), 
> StructField(timestamp,TimestampType,true), StructField(value,LongType,true)), 
> obj#6: org.apache.spark.sql.Row
> :              +- *(1) Project [timestamp#0, value#1L]
> :                 +- MicroBatchScan[timestamp#0, value#1L] class 
> org.apache.spark.sql.execution.streaming.sources.RateStreamTable$$anon$1
> +- PlanLater LocalRelation <empty>, [value#23]
> )
>     - element of array (index: 0)
>     - array (class [Ljava.lang.Object;, size 1)
>     - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, 
> type: class [Ljava.lang.Object;)
>     - object (class java.lang.invoke.SerializedLambda, 
> SerializedLambda[capturingClass=interface 
> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExecBase, 
> functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;,
>  implementation=invokeStatic 
> org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExecBase.$anonfun$doExecute$3:(Lorg/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExecBase;Lorg/apache/spark/sql/execution/streaming/state/StateStore;Lscala/collection/Iterator;)Lscala/collection/Iterator;,
>  
> instantiatedMethodType=(Lorg/apache/spark/sql/execution/streaming/state/StateStore;Lscala/collection/Iterator;)Lscala/collection/Iterator;,
>  numCaptured=1])
>     - writeReplace data (class: java.lang.invoke.SerializedLambda)
>     - object (class 
> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExecBase$$Lambda$2446/0x00000116e3cb4c30,
>  
> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExecBase$$Lambda$2446/0x00000116e3cb4c30@72c241e1)
>     at 
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:43)
>     at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:50)
>     at 
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:122)
>     at 
> org.apache.spark.util.SparkClosureCleaner$.clean(SparkClosureCleaner.scala:42)
>     at org.apache.spark.SparkContext.clean(SparkContext.scala:2839)
>     at 
> org.apache.spark.sql.execution.streaming.state.package$StateStoreOps.mapPartitionsWithStateStore(package.scala:67)
>     at 
> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExecBase.doExecute(FlatMapGroupsWithStateExec.scala:263)
>     at 
> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExecBase.doExecute$(FlatMapGroupsWithStateExec.scala:207)
>     at 
> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec.doExecute(FlatMapGroupsWithStateExec.scala:403)
>     at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeRDD$1(SparkPlan.scala:188)
>     at scala.util.Try$.apply(Try.scala:217)
>     at 
> org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
>     at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46)
>     at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46)
>     at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
>     at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:201)
>     at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:260)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>     at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:257)
>     at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:197)
>     at 
> org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:533)
>     at 
> org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:461)
>     at 
> org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:460)
>     at 
> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:504)
>     at 
> org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:117)
>     at 
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:761)
>     at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeRDD$1(SparkPlan.scala:188)
>     at scala.util.Try$.apply(Try.scala:217)
>     at 
> org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
>     at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46)
>     at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46)
>     ... 66 more
> Seemingly he want to serialize the Kryo Serializer but this is not possible 
> since KryoSerializationCodec$ & scala.Function0 are not serializeable.
> To reproduce this issue:
> Use the following dependencies:
> {color:#e8bf6a}<dependency>
> {color}{color:#e8bf6a} 
> <groupId>{color}org.apache.spark{color:#e8bf6a}</groupId>
> {color}{color:#e8bf6a} 
> <artifactId>{color}spark-sql_2.13{color:#e8bf6a}</artifactId>
> {color}{color:#e8bf6a} <version>{color}4.0.0{color:#e8bf6a}</version>
> {color}{color:#e8bf6a}</dependency>
> {color}{color:#e8bf6a}<dependency>
> {color}{color:#e8bf6a} 
> <groupId>{color}org.projectlombok{color:#e8bf6a}</groupId>
> {color}{color:#e8bf6a} <artifactId>{color}lombok{color:#e8bf6a}</artifactId>
> {color}{color:#e8bf6a} <version>{color}1.18.38{color:#e8bf6a}</version>
> {color}{color:#e8bf6a}</dependency>{color}
> And the following code snippet:
> {color:#cc7832}import {color}io.jsonwebtoken.lang.Collections{color:#cc7832};
> {color}{color:#cc7832}import 
> {color}lombok.{color:#bbb529}AllArgsConstructor{color}{color:#cc7832};
> {color}{color:#cc7832}import 
> {color}lombok.{color:#bbb529}Data{color}{color:#cc7832};
> {color}{color:#cc7832}import 
> {color}lombok.{color:#bbb529}NoArgsConstructor{color}{color:#cc7832};
> {color}{color:#cc7832}import {color}org.apache.spark.SparkConf{color:#cc7832};
> {color}{color:#cc7832}import 
> {color}org.apache.spark.api.java.function.FlatMapGroupsFunction{color:#cc7832};
> {color}{color:#cc7832}import 
> {color}org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction{color:#cc7832};
> {color}{color:#cc7832}import 
> {color}org.apache.spark.api.java.function.MapFunction{color:#cc7832};
> {color}{color:#cc7832}import 
> {color}org.apache.spark.sql.Dataset{color:#cc7832};
> {color}{color:#cc7832}import 
> {color}org.apache.spark.sql.Encoders{color:#cc7832};
> {color}{color:#cc7832}import 
> {color}org.apache.spark.sql.KeyValueGroupedDataset{color:#cc7832};
> {color}{color:#cc7832}import {color}org.apache.spark.sql.Row{color:#cc7832};
> {color}{color:#cc7832}import 
> {color}org.apache.spark.sql.classic.SparkSession{color:#cc7832};
> {color}{color:#cc7832}import 
> {color}org.apache.spark.sql.streaming.DataStreamReader{color:#cc7832};
> {color}{color:#cc7832}import 
> {color}org.apache.spark.sql.streaming.GroupState{color:#cc7832};
> {color}{color:#cc7832}import 
> {color}org.apache.spark.sql.streaming.GroupStateTimeout{color:#cc7832};
> {color}{color:#cc7832}import 
> {color}org.apache.spark.sql.streaming.OutputMode{color:#cc7832};
> {color}{color:#cc7832}import 
> {color}org.apache.spark.sql.streaming.StreamingQueryException{color:#cc7832};
> {color}{color:#cc7832}
> {color}{color:#cc7832}import {color}java.io.Serializable{color:#cc7832};
> {color}{color:#cc7832}import {color}java.sql.Timestamp{color:#cc7832};
> {color}{color:#cc7832}import {color}java.util.Iterator{color:#cc7832};
> {color}{color:#cc7832}import 
> {color}java.util.concurrent.TimeoutException{color:#cc7832};
> {color}{color:#cc7832}
> {color}{color:#cc7832}public class {color}Main {
> {color:#cc7832}public static void {color}{color:#ffc66d}main{color}(String[] 
> args) {color:#cc7832}throws {color}TimeoutException{color:#cc7832}, 
> {color}StreamingQueryException {
> SparkSession.Builder b = SparkSession.builder(){color:#cc7832};
> {color}{color:#cc7832}
> {color}{color:#cc7832} {color}SparkConf conf = {color:#cc7832}new 
> {color}SparkConf()
> .set({color:#6a8759}"spark.sql.shuffle.partitions"{color}{color:#cc7832}, 
> {color}{color:#6a8759}"2"{color})
> .setMaster({color:#6a8759}"local[2]"{color}){color:#cc7832};
> {color}{color:#cc7832}
> {color}{color:#cc7832} {color}SparkSession session = b.config(conf)
> .getOrCreate(){color:#cc7832};
> {color}{color:#cc7832}
> {color}{color:#cc7832} {color}DataStreamReader reader = 
> session.readStream(){color:#cc7832};
> {color}{color:#cc7832}
> {color}{color:#cc7832} {color}Dataset<Row> ds = 
> reader.format({color:#6a8759}"rate"{color})
> .option({color:#6a8759}"rowsPerSecond"{color}{color:#cc7832}, 
> {color}{color:#6897bb}1{color}).load(){color:#cc7832};
> {color}{color:#cc7832}
> {color}{color:#cc7832} {color}Dataset<Input> mapped = 
> ds.map((MapFunction<Row{color:#cc7832}, {color}Input>) e -> 
> {color:#cc7832}new {color}Input({color:#cc7832}new 
> {color}Key(({color:#cc7832}int{color}) (e.getLong({color:#6897bb}1{color}) % 
> {color:#6897bb}10{color})){color:#cc7832}, 
> {color}e.getLong({color:#6897bb}1{color}){color:#cc7832}, 
> {color}e.getTimestamp({color:#6897bb}0{color})){color:#cc7832}, 
> {color}Encoders.kryo(Input.{color:#cc7832}class{color})){color:#cc7832};
> {color}{color:#cc7832}
> {color}{color:#cc7832} {color}KeyValueGroupedDataset<Key{color:#cc7832}, 
> {color}Input> keyed = mapped.groupByKey((MapFunction<Input{color:#cc7832}, 
> {color}Key>) Input::getKey{color:#cc7832}, 
> {color}Encoders.kryo(Key.{color:#cc7832}class{color})){color:#cc7832};
> {color}{color:#cc7832}
> {color}{color:#cc7832} {color}Dataset<Result> res = 
> flatMapGroupState(keyed){color:#cc7832};
> {color}{color:#cc7832}
> {color}{color:#cc7832} {color}res.writeStream()
> .format({color:#6a8759}"console"{color})
> .start()
> .awaitTermination(){color:#cc7832};
> {color}{color:#cc7832} {color}}
> {color:#cc7832}private static {color}Dataset<Result> 
> {color:#ffc66d}flatMapGroupState{color}(KeyValueGroupedDataset<Key{color:#cc7832},
>  {color}Input> keyed) {
> {color:#cc7832}return {color}keyed.flatMapGroupsWithState({color:#cc7832}new 
> {color}FlatMapGroupsWithStateFunction<Key{color:#cc7832}, 
> {color}Input{color:#cc7832}, {color}State{color:#cc7832}, {color}Result>() {
> {color:#bbb529}@Override
> {color}{color:#bbb529} {color}{color:#cc7832}public {color}Iterator<Result> 
> {color:#ffc66d}call{color}(Key key{color:#cc7832}, {color}Iterator<Input> 
> iterator{color:#cc7832}, {color}GroupState<State> groupState) 
> {color:#cc7832}throws {color}Exception {
> State state = groupState.getOption().getOrElse(() -> {color:#cc7832}new 
> {color}State({color:#b389c5}key{color}{color:#cc7832}, 
> {color}{color:#6897bb}0{color})){color:#cc7832};
> {color}{color:#cc7832}
> {color}{color:#cc7832} {color}iterator.forEachRemaining(pojo -> 
> {color:#b389c5}state{color}.setCount({color:#b389c5}state{color}.getCount() + 
> {color:#6897bb}1{color})){color:#cc7832};
> {color}{color:#cc7832}
> {color}{color:#cc7832} {color}groupState.update(state){color:#cc7832};
> {color}{color:#cc7832}
> {color}{color:#cc7832} return {color}Collections.of({color:#cc7832}new 
> {color}Result(state.getKey(){color:#cc7832}, 
> {color}state.getCount())).iterator(){color:#cc7832};
> {color}{color:#cc7832} {color}}
> }{color:#cc7832}, {color}OutputMode.Append(){color:#cc7832}, 
> {color}Encoders.kryo(State.{color:#cc7832}class{color}){color:#cc7832}, 
> {color}Encoders.kryo(Result.{color:#cc7832}class{color}){color:#cc7832}, 
> {color}GroupStateTimeout.NoTimeout()){color:#cc7832};
> {color}{color:#cc7832} {color}}
> {color:#bbb529}@Data
> {color}{color:#bbb529} @AllArgsConstructor
> {color}{color:#bbb529} @NoArgsConstructor
> {color}{color:#bbb529} {color}{color:#cc7832}public static class {color}Key 
> {color:#cc7832}implements {color}Serializable {
> {color:#cc7832}int {color}{color:#9876aa}value{color}{color:#cc7832};
> {color}{color:#cc7832} {color}}
> {color:#bbb529}@AllArgsConstructor
> {color}{color:#bbb529} @Data
> {color}{color:#bbb529} @NoArgsConstructor
> {color}{color:#bbb529} {color}{color:#cc7832}public static class 
> {color}Result {color:#cc7832}implements {color}Serializable {
> {color:#cc7832}private {color}Key {color:#9876aa}key{color}{color:#cc7832};
> {color}{color:#cc7832} private long 
> {color}{color:#9876aa}count{color}{color:#cc7832};
> {color}{color:#cc7832} {color}}
> {color:#bbb529}@AllArgsConstructor
> {color}{color:#bbb529} @Data
> {color}{color:#bbb529} @NoArgsConstructor
> {color}{color:#bbb529} {color}{color:#cc7832}public static class {color}State 
> {color:#cc7832}implements {color}Serializable {
> {color:#cc7832}private {color}Key {color:#9876aa}key{color}{color:#cc7832};
> {color}{color:#cc7832} private long 
> {color}{color:#9876aa}count{color}{color:#cc7832};
> {color}{color:#cc7832} {color}}
> {color:#bbb529}@Data
> {color}{color:#bbb529} @AllArgsConstructor
> {color}{color:#bbb529} @NoArgsConstructor
> {color}{color:#bbb529} {color}{color:#cc7832}public static class {color}Input 
> {color:#cc7832}implements {color}Serializable {
> Key {color:#9876aa}key{color}{color:#cc7832};
> {color}{color:#cc7832} long {color}{color:#9876aa}index{color}{color:#cc7832};
> {color}{color:#cc7832} {color}Timestamp 
> {color:#9876aa}timestamp{color}{color:#cc7832};
> {color}{color:#cc7832} {color}}
> {color:#cc7832}private static {color}Dataset<Result> 
> {color:#ffc66d}flatMapGroup{color}(KeyValueGroupedDataset<Key{color:#cc7832}, 
> {color}Input> keyed) {
> {color:#cc7832}return {color}keyed.flatMapGroups({color:#cc7832}new 
> {color}FlatMapGroupsFunction<Key{color:#cc7832}, {color}Input{color:#cc7832}, 
> {color}Result>() {
> {color:#bbb529}@Override
> {color}{color:#bbb529} {color}{color:#cc7832}public {color}Iterator<Result> 
> {color:#ffc66d}call{color}(Key key{color:#cc7832}, {color}Iterator<Input> 
> iterator) {color:#cc7832}throws {color}Exception {
> {color:#cc7832}int {color}count = {color:#6897bb}0{color}{color:#cc7832};
> {color}{color:#cc7832} while {color}(iterator.hasNext()) {
> iterator.next(){color:#cc7832};
> {color}{color:#cc7832} {color}count++{color:#cc7832};
> {color}{color:#cc7832} {color}}
> {color:#cc7832}return {color}Collections.of({color:#cc7832}new 
> {color}Result(key{color:#cc7832}, {color}count)).iterator(){color:#cc7832};
> {color}{color:#cc7832} {color}}
> }{color:#cc7832}, 
> {color}Encoders.kryo(Result.{color:#cc7832}class{color})){color:#cc7832};
> {color}{color:#cc7832} {color}}
> }
> If you simply change 
> Encoders.kryo(State.{color:#cc7832}class{color})
> to
> Encoders.bean(State.{color:#cc7832}class{color})
> This example starts working
> I could not find a feasible work around so i created this as major issue, i 
> hope that fits.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to