[ https://issues.apache.org/jira/browse/SPARK-52819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated SPARK-52819: ----------------------------------- Labels: pull-request-available (was: ) > Structured Streaming: FlatMapGroupsWithState: > java.io.NotSerializableException: > org.apache.spark.sql.catalyst.encoders.KryoSerializationCodec$ > ---------------------------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-52819 > URL: https://issues.apache.org/jira/browse/SPARK-52819 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 4.0.0 > Reporter: Quirin Kögl > Priority: Major > Labels: pull-request-available > Attachments: Main.java > > > When trying to use a kryo serializer for the State using the > Dataset.flatMapGroupsWithState function. > The following exception occurs: > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.SparkClosureCleaner$.clean(SparkClosureCleaner.scala:45) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2839) > at > org.apache.spark.sql.execution.streaming.state.package$StateStoreOps.mapPartitionsWithStateStore(package.scala:67) > at > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExecBase.doExecute(FlatMapGroupsWithStateExec.scala:263) > at > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExecBase.doExecute$(FlatMapGroupsWithStateExec.scala:207) > at > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec.doExecute(FlatMapGroupsWithStateExec.scala:403) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeRDD$1(SparkPlan.scala:188) > at scala.util.Try$.apply(Try.scala:217) > at > org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378) > at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46) > at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46) > at org.apache.spark.util.LazyTry.get(LazyTry.scala:58) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:201) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:260) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:257) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:197) > at > org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:533) > at > org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:461) > at > org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:460) > at > org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:504) > at > org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:117) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:761) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeRDD$1(SparkPlan.scala:188) > at scala.util.Try$.apply(Try.scala:217) > at > org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1439) > at org.apache.spark.util.LazyTry.get(LazyTry.scala:58) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:201) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:260) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:257) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:197) > at > org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:399) > at > org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:397) > at > org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2Exec.scala:329) > at > org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.run(WriteToDataSourceV2Exec.scala:340) > at > org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43) > at > org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43) > at > org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49) > at > org.apache.spark.sql.classic.Dataset.collectFromPlan(Dataset.scala:2244) > at > org.apache.spark.sql.classic.Dataset.$anonfun$collect$1(Dataset.scala:1482) > at > org.apache.spark.sql.classic.Dataset.$anonfun$withAction$2(Dataset.scala:2234) > at > org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654) > at > org.apache.spark.sql.classic.Dataset.$anonfun$withAction$1(Dataset.scala:2232) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:162) > at > org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:268) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:124) > at > org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94) > at > org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112) > at > org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106) > at > org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:124) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:291) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:123) > at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:77) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:233) > at org.apache.spark.sql.classic.Dataset.withAction(Dataset.scala:2232) > at org.apache.spark.sql.classic.Dataset.collect(Dataset.scala:1482) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:888) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:162) > at > org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:268) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:124) > at > org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94) > at > org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112) > at > org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106) > at > org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:124) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:291) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:123) > at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:77) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:233) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:876) > at > org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:186) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:876) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$2(MicroBatchExecution.scala:394) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) > at > org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:186) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:364) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:344) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1$adapted(MicroBatchExecution.scala:344) > at > org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch(TriggerExecutor.scala:39) > at > org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:37) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.runOneBatch(TriggerExecutor.scala:70) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:82) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:344) > at > org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:337) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) > at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:311) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:226) > Suppressed: org.apache.spark.util.Utils$OriginalTryStackTraceException: > Full stacktrace of original doTryWithCallerStacktrace caller > at > org.apache.spark.util.SparkClosureCleaner$.clean(SparkClosureCleaner.scala:45) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2839) > at > org.apache.spark.sql.execution.streaming.state.package$StateStoreOps.mapPartitionsWithStateStore(package.scala:67) > at > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExecBase.doExecute(FlatMapGroupsWithStateExec.scala:263) > at > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExecBase.doExecute$(FlatMapGroupsWithStateExec.scala:207) > at > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec.doExecute(FlatMapGroupsWithStateExec.scala:403) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeRDD$1(SparkPlan.scala:188) > at scala.util.Try$.apply(Try.scala:217) > at > org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378) > at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46) > at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46) > at org.apache.spark.util.LazyTry.get(LazyTry.scala:58) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:201) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:260) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:257) > at > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:197) > at > org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:533) > at > org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:461) > at > org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:460) > at > org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:504) > at > org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:117) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:761) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeRDD$1(SparkPlan.scala:188) > at scala.util.Try$.apply(Try.scala:217) > at > org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378) > at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46) > at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46) > ... 66 more > Caused by: java.io.NotSerializableException: > org.apache.spark.sql.catalyst.encoders.KryoSerializationCodec$ > Serialization stack: > - object not serializable (class: > org.apache.spark.sql.catalyst.encoders.KryoSerializationCodec$, value: > <function0>) > - field (class: > org.apache.spark.sql.catalyst.encoders.AgnosticEncoders$TransformingEncoder, > name: codecProvider, type: interface scala.Function0) > - object (class > org.apache.spark.sql.catalyst.encoders.AgnosticEncoders$TransformingEncoder, > TransformingEncoder(qukoegl.Main$State,BinaryEncoder,<function0>,false)) > - field (class: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, > name: encoder, type: interface > org.apache.spark.sql.catalyst.encoders.AgnosticEncoder) > - object (class org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, > class[value[0]: binary]) > - field (class: > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec, name: > stateEncoder, type: class > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder) > - object (class > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec, > FlatMapGroupsWithState > org.apache.spark.sql.internal.ToScalaUDF$$$Lambda$1541/0x00000116e397f3e8@6578637, > decodeusingserializer(value#16, qukoegl.Main$Key, true), > decodeusingserializer(value#10, qukoegl.Main$Input, true), > decodeusingserializer(value#16, qukoegl.Main$Key, true), [value#16|#16], > [value#16|#16], [value#10|#10], [value#10|#10], obj#21: qukoegl.Main$Result, > state info [ checkpoint = > [file:/C:/Users/koeglq/AppData/Local/Temp/temporary-3e5bc956-591a-4e55-8c49-f24a41eb99ca/state|file:///C:/Users/koeglq/AppData/Local/Temp/temporary-3e5bc956-591a-4e55-8c49-f24a41eb99ca/state], > runId = c125342a-6ab9-42c8-962d-c1ccdbba9ce4, opId = 0, ver = 0, > numPartitions = 2] stateStoreCkptIds = None, class[value[0]: binary], 2, > Append, NoTimeout, 1752568574400, 0, 0, false, false > :- *(2) Sort [value#16 ASC NULLS FIRST|#16 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(value#16, 2), REQUIRED_BY_STATEFUL_OPERATOR, > [plan_id=52] > : +- AppendColumnsWithObject > org.apache.spark.sql.internal.ToScalaUDF$$$Lambda$1539/0x00000116e397b708@59abcdc, > [encodeusingserializer(input[0, qukoegl.Main$Input, false], true) AS > value#10|#10], [encodeusingserializer(input[0, qukoegl.Main$Key, false], > true) AS value#16|#16] > : +- MapElements > qukoegl.Main$$Lambda$1414/0x00000116e391ec40@5d777d92, obj#8: > qukoegl.Main$Input > : +- DeserializeToObject > createexternalrow(static_invoke(DateTimeUtils.toJavaTimestamp(timestamp#0)), > static_invoke(java.lang.Long.valueOf(value#1L)), > StructField(timestamp,TimestampType,true), StructField(value,LongType,true)), > obj#6: org.apache.spark.sql.Row > : +- *(1) Project [timestamp#0, value#1L|#0, value#1L] > : +- MicroBatchScan[timestamp#0, value#1L|#0, value#1L] class > org.apache.spark.sql.execution.streaming.sources.RateStreamTable$$anon$1 > +- PlanLater LocalRelation <empty>, [value#23|#23] > ) > - element of array (index: 0) > - array (class [Ljava.lang.Object;, size 1) > - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, > type: class [Ljava.lang.Object;) > - object (class java.lang.invoke.SerializedLambda, > SerializedLambda[capturingClass=interface > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExecBase, > functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, > implementation=invokeStatic > org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExecBase.$anonfun$doExecute$3:(Lorg/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExecBase;Lorg/apache/spark/sql/execution/streaming/state/StateStore;Lscala/collection/Iterator;)Lscala/collection/Iterator;, > > instantiatedMethodType=(Lorg/apache/spark/sql/execution/streaming/state/StateStore;Lscala/collection/Iterator;)Lscala/collection/Iterator;, > numCaptured=1]) > - writeReplace data (class: java.lang.invoke.SerializedLambda) > - object (class > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExecBase$$Lambda$2446/0x00000116e3cb4c30, > > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExecBase$$Lambda$2446/0x00000116e3cb4c30@72c241e1) > at > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:43) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:50) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:122) > at > org.apache.spark.util.SparkClosureCleaner$.clean(SparkClosureCleaner.scala:42) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2839) > at > org.apache.spark.sql.execution.streaming.state.package$StateStoreOps.mapPartitionsWithStateStore(package.scala:67) > at > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExecBase.doExecute(FlatMapGroupsWithStateExec.scala:263) > at > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExecBase.doExecute$(FlatMapGroupsWithStateExec.scala:207) > at > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec.doExecute(FlatMapGroupsWithStateExec.scala:403) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeRDD$1(SparkPlan.scala:188) > at scala.util.Try$.apply(Try.scala:217) > at > org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378) > at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46) > at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46) > at org.apache.spark.util.LazyTry.get(LazyTry.scala:58) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:201) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:260) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:257) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:197) > at > org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:533) > at > org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:461) > at > org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:460) > at > org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:504) > at > org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:117) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:761) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeRDD$1(SparkPlan.scala:188) > at scala.util.Try$.apply(Try.scala:217) > at > org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378) > at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46) > at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46) > ... 66 more > Seemingly he want to serialize the Kryo Serializer but this is not possible > since KryoSerializationCodec$ & scala.Function0 are not serializeable. > To reproduce this issue: > Use the following dependencies: > {code:java} > <dependency> > <groupId>org.apache.spark</groupId> > <artifactId>spark-sql_2.13</artifactId> > <version>4.0.0</version> > </dependency> > <dependency> > <groupId>org.projectlombok</groupId> > <artifactId>lombok</artifactId> > <version>1.18.38</version> > </dependency> {code} > And the following code snippet: > {color:#cc7832}-> See Main.java{color} > {color:#cc7832} > {color:#172b4d}If you simply change {color} > Encoders.kryo(State.class{color}) > to{color:#cc7832} > Encoders.bean(State.class{color}) > This example starts working > I could not find a feasible work around so i created this as major issue, i > hope that fits. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org