[ https://issues.apache.org/jira/browse/SPARK-52819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Quirin Kögl updated SPARK-52819: -------------------------------- Attachment: Main.java > Structured Streaming: FlatMapGroupsWithState: > java.io.NotSerializableException: > org.apache.spark.sql.catalyst.encoders.KryoSerializationCodec$ > ---------------------------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-52819 > URL: https://issues.apache.org/jira/browse/SPARK-52819 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 4.0.0 > Reporter: Quirin Kögl > Priority: Major > Attachments: Main.java > > > When trying to use a kryo serializer for the State using the > Dataset.flatMapGroupsWithState function. > The following exception occurs: > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.SparkClosureCleaner$.clean(SparkClosureCleaner.scala:45) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2839) > at > org.apache.spark.sql.execution.streaming.state.package$StateStoreOps.mapPartitionsWithStateStore(package.scala:67) > at > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExecBase.doExecute(FlatMapGroupsWithStateExec.scala:263) > at > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExecBase.doExecute$(FlatMapGroupsWithStateExec.scala:207) > at > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec.doExecute(FlatMapGroupsWithStateExec.scala:403) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeRDD$1(SparkPlan.scala:188) > at scala.util.Try$.apply(Try.scala:217) > at > org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378) > at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46) > at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46) > at org.apache.spark.util.LazyTry.get(LazyTry.scala:58) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:201) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:260) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:257) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:197) > at > org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:533) > at > org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:461) > at > org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:460) > at > org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:504) > at > org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:117) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:761) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeRDD$1(SparkPlan.scala:188) > at scala.util.Try$.apply(Try.scala:217) > at > org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1439) > at org.apache.spark.util.LazyTry.get(LazyTry.scala:58) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:201) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:260) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:257) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:197) > at > org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:399) > at > org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:397) > at > org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2Exec.scala:329) > at > org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.run(WriteToDataSourceV2Exec.scala:340) > at > org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43) > at > org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43) > at > org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49) > at > org.apache.spark.sql.classic.Dataset.collectFromPlan(Dataset.scala:2244) > at > org.apache.spark.sql.classic.Dataset.$anonfun$collect$1(Dataset.scala:1482) > at > org.apache.spark.sql.classic.Dataset.$anonfun$withAction$2(Dataset.scala:2234) > at > org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654) > at > org.apache.spark.sql.classic.Dataset.$anonfun$withAction$1(Dataset.scala:2232) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:162) > at > org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:268) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:124) > at > org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94) > at > org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112) > at > org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106) > at > org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:124) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:291) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:123) > at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:77) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:233) > at org.apache.spark.sql.classic.Dataset.withAction(Dataset.scala:2232) > at org.apache.spark.sql.classic.Dataset.collect(Dataset.scala:1482) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:888) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:162) > at > org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:268) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:124) > at > org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94) > at > org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112) > at > org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106) > at > org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:124) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:291) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:123) > at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:77) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:233) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:876) > at > org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:186) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:876) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$2(MicroBatchExecution.scala:394) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) > at > org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:186) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:364) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:344) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1$adapted(MicroBatchExecution.scala:344) > at > org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch(TriggerExecutor.scala:39) > at > org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:37) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.runOneBatch(TriggerExecutor.scala:70) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:82) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:344) > at > org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:337) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) > at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:311) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:226) > Suppressed: org.apache.spark.util.Utils$OriginalTryStackTraceException: > Full stacktrace of original doTryWithCallerStacktrace caller > at > org.apache.spark.util.SparkClosureCleaner$.clean(SparkClosureCleaner.scala:45) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2839) > at > org.apache.spark.sql.execution.streaming.state.package$StateStoreOps.mapPartitionsWithStateStore(package.scala:67) > at > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExecBase.doExecute(FlatMapGroupsWithStateExec.scala:263) > at > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExecBase.doExecute$(FlatMapGroupsWithStateExec.scala:207) > at > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec.doExecute(FlatMapGroupsWithStateExec.scala:403) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeRDD$1(SparkPlan.scala:188) > at scala.util.Try$.apply(Try.scala:217) > at > org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378) > at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46) > at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46) > at org.apache.spark.util.LazyTry.get(LazyTry.scala:58) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:201) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:260) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:257) > at > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:197) > at > org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:533) > at > org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:461) > at > org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:460) > at > org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:504) > at > org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:117) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:761) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeRDD$1(SparkPlan.scala:188) > at scala.util.Try$.apply(Try.scala:217) > at > org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378) > at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46) > at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46) > ... 66 more > Caused by: java.io.NotSerializableException: > org.apache.spark.sql.catalyst.encoders.KryoSerializationCodec$ > Serialization stack: > - object not serializable (class: > org.apache.spark.sql.catalyst.encoders.KryoSerializationCodec$, value: > <function0>) > - field (class: > org.apache.spark.sql.catalyst.encoders.AgnosticEncoders$TransformingEncoder, > name: codecProvider, type: interface scala.Function0) > - object (class > org.apache.spark.sql.catalyst.encoders.AgnosticEncoders$TransformingEncoder, > TransformingEncoder(qukoegl.Main$State,BinaryEncoder,<function0>,false)) > - field (class: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, > name: encoder, type: interface > org.apache.spark.sql.catalyst.encoders.AgnosticEncoder) > - object (class org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, > class[value[0]: binary]) > - field (class: > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec, name: > stateEncoder, type: class > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder) > - object (class > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec, > FlatMapGroupsWithState > org.apache.spark.sql.internal.ToScalaUDF$$$Lambda$1541/0x00000116e397f3e8@6578637, > decodeusingserializer(value#16, qukoegl.Main$Key, true), > decodeusingserializer(value#10, qukoegl.Main$Input, true), > decodeusingserializer(value#16, qukoegl.Main$Key, true), [value#16], > [value#16], [value#10], [value#10], obj#21: qukoegl.Main$Result, state info [ > checkpoint = > file:/C:/Users/koeglq/AppData/Local/Temp/temporary-3e5bc956-591a-4e55-8c49-f24a41eb99ca/state, > runId = c125342a-6ab9-42c8-962d-c1ccdbba9ce4, opId = 0, ver = 0, > numPartitions = 2] stateStoreCkptIds = None, class[value[0]: binary], 2, > Append, NoTimeout, 1752568574400, 0, 0, false, false > :- *(2) Sort [value#16 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(value#16, 2), REQUIRED_BY_STATEFUL_OPERATOR, > [plan_id=52] > : +- AppendColumnsWithObject > org.apache.spark.sql.internal.ToScalaUDF$$$Lambda$1539/0x00000116e397b708@59abcdc, > [encodeusingserializer(input[0, qukoegl.Main$Input, false], true) AS > value#10], [encodeusingserializer(input[0, qukoegl.Main$Key, false], true) AS > value#16] > : +- MapElements > qukoegl.Main$$Lambda$1414/0x00000116e391ec40@5d777d92, obj#8: > qukoegl.Main$Input > : +- DeserializeToObject > createexternalrow(static_invoke(DateTimeUtils.toJavaTimestamp(timestamp#0)), > static_invoke(java.lang.Long.valueOf(value#1L)), > StructField(timestamp,TimestampType,true), StructField(value,LongType,true)), > obj#6: org.apache.spark.sql.Row > : +- *(1) Project [timestamp#0, value#1L] > : +- MicroBatchScan[timestamp#0, value#1L] class > org.apache.spark.sql.execution.streaming.sources.RateStreamTable$$anon$1 > +- PlanLater LocalRelation <empty>, [value#23] > ) > - element of array (index: 0) > - array (class [Ljava.lang.Object;, size 1) > - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, > type: class [Ljava.lang.Object;) > - object (class java.lang.invoke.SerializedLambda, > SerializedLambda[capturingClass=interface > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExecBase, > functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, > implementation=invokeStatic > org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExecBase.$anonfun$doExecute$3:(Lorg/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExecBase;Lorg/apache/spark/sql/execution/streaming/state/StateStore;Lscala/collection/Iterator;)Lscala/collection/Iterator;, > > instantiatedMethodType=(Lorg/apache/spark/sql/execution/streaming/state/StateStore;Lscala/collection/Iterator;)Lscala/collection/Iterator;, > numCaptured=1]) > - writeReplace data (class: java.lang.invoke.SerializedLambda) > - object (class > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExecBase$$Lambda$2446/0x00000116e3cb4c30, > > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExecBase$$Lambda$2446/0x00000116e3cb4c30@72c241e1) > at > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:43) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:50) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:122) > at > org.apache.spark.util.SparkClosureCleaner$.clean(SparkClosureCleaner.scala:42) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2839) > at > org.apache.spark.sql.execution.streaming.state.package$StateStoreOps.mapPartitionsWithStateStore(package.scala:67) > at > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExecBase.doExecute(FlatMapGroupsWithStateExec.scala:263) > at > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExecBase.doExecute$(FlatMapGroupsWithStateExec.scala:207) > at > org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec.doExecute(FlatMapGroupsWithStateExec.scala:403) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeRDD$1(SparkPlan.scala:188) > at scala.util.Try$.apply(Try.scala:217) > at > org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378) > at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46) > at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46) > at org.apache.spark.util.LazyTry.get(LazyTry.scala:58) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:201) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:260) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:257) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:197) > at > org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:533) > at > org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:461) > at > org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:460) > at > org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:504) > at > org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:117) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:761) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeRDD$1(SparkPlan.scala:188) > at scala.util.Try$.apply(Try.scala:217) > at > org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378) > at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46) > at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46) > ... 66 more > Seemingly he want to serialize the Kryo Serializer but this is not possible > since KryoSerializationCodec$ & scala.Function0 are not serializeable. > To reproduce this issue: > Use the following dependencies: > {color:#e8bf6a}<dependency> > {color}{color:#e8bf6a} > <groupId>{color}org.apache.spark{color:#e8bf6a}</groupId> > {color}{color:#e8bf6a} > <artifactId>{color}spark-sql_2.13{color:#e8bf6a}</artifactId> > {color}{color:#e8bf6a} <version>{color}4.0.0{color:#e8bf6a}</version> > {color}{color:#e8bf6a}</dependency> > {color}{color:#e8bf6a}<dependency> > {color}{color:#e8bf6a} > <groupId>{color}org.projectlombok{color:#e8bf6a}</groupId> > {color}{color:#e8bf6a} <artifactId>{color}lombok{color:#e8bf6a}</artifactId> > {color}{color:#e8bf6a} <version>{color}1.18.38{color:#e8bf6a}</version> > {color}{color:#e8bf6a}</dependency>{color} > And the following code snippet: > {color:#cc7832}import {color}io.jsonwebtoken.lang.Collections{color:#cc7832}; > {color}{color:#cc7832}import > {color}lombok.{color:#bbb529}AllArgsConstructor{color}{color:#cc7832}; > {color}{color:#cc7832}import > {color}lombok.{color:#bbb529}Data{color}{color:#cc7832}; > {color}{color:#cc7832}import > {color}lombok.{color:#bbb529}NoArgsConstructor{color}{color:#cc7832}; > {color}{color:#cc7832}import {color}org.apache.spark.SparkConf{color:#cc7832}; > {color}{color:#cc7832}import > {color}org.apache.spark.api.java.function.FlatMapGroupsFunction{color:#cc7832}; > {color}{color:#cc7832}import > {color}org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction{color:#cc7832}; > {color}{color:#cc7832}import > {color}org.apache.spark.api.java.function.MapFunction{color:#cc7832}; > {color}{color:#cc7832}import > {color}org.apache.spark.sql.Dataset{color:#cc7832}; > {color}{color:#cc7832}import > {color}org.apache.spark.sql.Encoders{color:#cc7832}; > {color}{color:#cc7832}import > {color}org.apache.spark.sql.KeyValueGroupedDataset{color:#cc7832}; > {color}{color:#cc7832}import {color}org.apache.spark.sql.Row{color:#cc7832}; > {color}{color:#cc7832}import > {color}org.apache.spark.sql.classic.SparkSession{color:#cc7832}; > {color}{color:#cc7832}import > {color}org.apache.spark.sql.streaming.DataStreamReader{color:#cc7832}; > {color}{color:#cc7832}import > {color}org.apache.spark.sql.streaming.GroupState{color:#cc7832}; > {color}{color:#cc7832}import > {color}org.apache.spark.sql.streaming.GroupStateTimeout{color:#cc7832}; > {color}{color:#cc7832}import > {color}org.apache.spark.sql.streaming.OutputMode{color:#cc7832}; > {color}{color:#cc7832}import > {color}org.apache.spark.sql.streaming.StreamingQueryException{color:#cc7832}; > {color}{color:#cc7832} > {color}{color:#cc7832}import {color}java.io.Serializable{color:#cc7832}; > {color}{color:#cc7832}import {color}java.sql.Timestamp{color:#cc7832}; > {color}{color:#cc7832}import {color}java.util.Iterator{color:#cc7832}; > {color}{color:#cc7832}import > {color}java.util.concurrent.TimeoutException{color:#cc7832}; > {color}{color:#cc7832} > {color}{color:#cc7832}public class {color}Main { > {color:#cc7832}public static void {color}{color:#ffc66d}main{color}(String[] > args) {color:#cc7832}throws {color}TimeoutException{color:#cc7832}, > {color}StreamingQueryException { > SparkSession.Builder b = SparkSession.builder(){color:#cc7832}; > {color}{color:#cc7832} > {color}{color:#cc7832} {color}SparkConf conf = {color:#cc7832}new > {color}SparkConf() > .set({color:#6a8759}"spark.sql.shuffle.partitions"{color}{color:#cc7832}, > {color}{color:#6a8759}"2"{color}) > .setMaster({color:#6a8759}"local[2]"{color}){color:#cc7832}; > {color}{color:#cc7832} > {color}{color:#cc7832} {color}SparkSession session = b.config(conf) > .getOrCreate(){color:#cc7832}; > {color}{color:#cc7832} > {color}{color:#cc7832} {color}DataStreamReader reader = > session.readStream(){color:#cc7832}; > {color}{color:#cc7832} > {color}{color:#cc7832} {color}Dataset<Row> ds = > reader.format({color:#6a8759}"rate"{color}) > .option({color:#6a8759}"rowsPerSecond"{color}{color:#cc7832}, > {color}{color:#6897bb}1{color}).load(){color:#cc7832}; > {color}{color:#cc7832} > {color}{color:#cc7832} {color}Dataset<Input> mapped = > ds.map((MapFunction<Row{color:#cc7832}, {color}Input>) e -> > {color:#cc7832}new {color}Input({color:#cc7832}new > {color}Key(({color:#cc7832}int{color}) (e.getLong({color:#6897bb}1{color}) % > {color:#6897bb}10{color})){color:#cc7832}, > {color}e.getLong({color:#6897bb}1{color}){color:#cc7832}, > {color}e.getTimestamp({color:#6897bb}0{color})){color:#cc7832}, > {color}Encoders.kryo(Input.{color:#cc7832}class{color})){color:#cc7832}; > {color}{color:#cc7832} > {color}{color:#cc7832} {color}KeyValueGroupedDataset<Key{color:#cc7832}, > {color}Input> keyed = mapped.groupByKey((MapFunction<Input{color:#cc7832}, > {color}Key>) Input::getKey{color:#cc7832}, > {color}Encoders.kryo(Key.{color:#cc7832}class{color})){color:#cc7832}; > {color}{color:#cc7832} > {color}{color:#cc7832} {color}Dataset<Result> res = > flatMapGroupState(keyed){color:#cc7832}; > {color}{color:#cc7832} > {color}{color:#cc7832} {color}res.writeStream() > .format({color:#6a8759}"console"{color}) > .start() > .awaitTermination(){color:#cc7832}; > {color}{color:#cc7832} {color}} > {color:#cc7832}private static {color}Dataset<Result> > {color:#ffc66d}flatMapGroupState{color}(KeyValueGroupedDataset<Key{color:#cc7832}, > {color}Input> keyed) { > {color:#cc7832}return {color}keyed.flatMapGroupsWithState({color:#cc7832}new > {color}FlatMapGroupsWithStateFunction<Key{color:#cc7832}, > {color}Input{color:#cc7832}, {color}State{color:#cc7832}, {color}Result>() { > {color:#bbb529}@Override > {color}{color:#bbb529} {color}{color:#cc7832}public {color}Iterator<Result> > {color:#ffc66d}call{color}(Key key{color:#cc7832}, {color}Iterator<Input> > iterator{color:#cc7832}, {color}GroupState<State> groupState) > {color:#cc7832}throws {color}Exception { > State state = groupState.getOption().getOrElse(() -> {color:#cc7832}new > {color}State({color:#b389c5}key{color}{color:#cc7832}, > {color}{color:#6897bb}0{color})){color:#cc7832}; > {color}{color:#cc7832} > {color}{color:#cc7832} {color}iterator.forEachRemaining(pojo -> > {color:#b389c5}state{color}.setCount({color:#b389c5}state{color}.getCount() + > {color:#6897bb}1{color})){color:#cc7832}; > {color}{color:#cc7832} > {color}{color:#cc7832} {color}groupState.update(state){color:#cc7832}; > {color}{color:#cc7832} > {color}{color:#cc7832} return {color}Collections.of({color:#cc7832}new > {color}Result(state.getKey(){color:#cc7832}, > {color}state.getCount())).iterator(){color:#cc7832}; > {color}{color:#cc7832} {color}} > }{color:#cc7832}, {color}OutputMode.Append(){color:#cc7832}, > {color}Encoders.kryo(State.{color:#cc7832}class{color}){color:#cc7832}, > {color}Encoders.kryo(Result.{color:#cc7832}class{color}){color:#cc7832}, > {color}GroupStateTimeout.NoTimeout()){color:#cc7832}; > {color}{color:#cc7832} {color}} > {color:#bbb529}@Data > {color}{color:#bbb529} @AllArgsConstructor > {color}{color:#bbb529} @NoArgsConstructor > {color}{color:#bbb529} {color}{color:#cc7832}public static class {color}Key > {color:#cc7832}implements {color}Serializable { > {color:#cc7832}int {color}{color:#9876aa}value{color}{color:#cc7832}; > {color}{color:#cc7832} {color}} > {color:#bbb529}@AllArgsConstructor > {color}{color:#bbb529} @Data > {color}{color:#bbb529} @NoArgsConstructor > {color}{color:#bbb529} {color}{color:#cc7832}public static class > {color}Result {color:#cc7832}implements {color}Serializable { > {color:#cc7832}private {color}Key {color:#9876aa}key{color}{color:#cc7832}; > {color}{color:#cc7832} private long > {color}{color:#9876aa}count{color}{color:#cc7832}; > {color}{color:#cc7832} {color}} > {color:#bbb529}@AllArgsConstructor > {color}{color:#bbb529} @Data > {color}{color:#bbb529} @NoArgsConstructor > {color}{color:#bbb529} {color}{color:#cc7832}public static class {color}State > {color:#cc7832}implements {color}Serializable { > {color:#cc7832}private {color}Key {color:#9876aa}key{color}{color:#cc7832}; > {color}{color:#cc7832} private long > {color}{color:#9876aa}count{color}{color:#cc7832}; > {color}{color:#cc7832} {color}} > {color:#bbb529}@Data > {color}{color:#bbb529} @AllArgsConstructor > {color}{color:#bbb529} @NoArgsConstructor > {color}{color:#bbb529} {color}{color:#cc7832}public static class {color}Input > {color:#cc7832}implements {color}Serializable { > Key {color:#9876aa}key{color}{color:#cc7832}; > {color}{color:#cc7832} long {color}{color:#9876aa}index{color}{color:#cc7832}; > {color}{color:#cc7832} {color}Timestamp > {color:#9876aa}timestamp{color}{color:#cc7832}; > {color}{color:#cc7832} {color}} > {color:#cc7832}private static {color}Dataset<Result> > {color:#ffc66d}flatMapGroup{color}(KeyValueGroupedDataset<Key{color:#cc7832}, > {color}Input> keyed) { > {color:#cc7832}return {color}keyed.flatMapGroups({color:#cc7832}new > {color}FlatMapGroupsFunction<Key{color:#cc7832}, {color}Input{color:#cc7832}, > {color}Result>() { > {color:#bbb529}@Override > {color}{color:#bbb529} {color}{color:#cc7832}public {color}Iterator<Result> > {color:#ffc66d}call{color}(Key key{color:#cc7832}, {color}Iterator<Input> > iterator) {color:#cc7832}throws {color}Exception { > {color:#cc7832}int {color}count = {color:#6897bb}0{color}{color:#cc7832}; > {color}{color:#cc7832} while {color}(iterator.hasNext()) { > iterator.next(){color:#cc7832}; > {color}{color:#cc7832} {color}count++{color:#cc7832}; > {color}{color:#cc7832} {color}} > {color:#cc7832}return {color}Collections.of({color:#cc7832}new > {color}Result(key{color:#cc7832}, {color}count)).iterator(){color:#cc7832}; > {color}{color:#cc7832} {color}} > }{color:#cc7832}, > {color}Encoders.kryo(Result.{color:#cc7832}class{color})){color:#cc7832}; > {color}{color:#cc7832} {color}} > } > If you simply change > Encoders.kryo(State.{color:#cc7832}class{color}) > to > Encoders.bean(State.{color:#cc7832}class{color}) > This example starts working > I could not find a feasible work around so i created this as major issue, i > hope that fits. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org