[ 
https://issues.apache.org/jira/browse/BEAM-12265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mazlum Tosun updated BEAM-12265:
--------------------------------
    Comment: was deleted

(was: Finally i found the solution, i downgraded the Kotlin version 
(dependencies + plugin) to 1.4.21.

In this case the problem of Lambda non Serializable disapeared and the kotlin 
Maven plugin doesn't have the virtual file problem at compile time :  
https://stackoverflow.com/questions/66170900/kotlin-1-4-30-apache-beam-compilation-error

This topic helped me a lot, thanks : 
https://youtrack.jetbrains.com/issue/KT-45067

Maybe in the future it would be great, if the Kotlin maven plugin works 
correctly with the version 1.4.x greater than 1.4.21.)

> FlatMapElement Kotlin Beam non Serializable lambda
> --------------------------------------------------
>
>                 Key: BEAM-12265
>                 URL: https://issues.apache.org/jira/browse/BEAM-12265
>             Project: Beam
>          Issue Type: Bug
>          Components: beam-community
>            Reporter: Mazlum Tosun
>            Priority: P0
>             Fix For: 2.27.0
>
>
> I have an existing Apache Beam project with Java 8, Apache Beam 2.27.0, Maven 
> and Dagger 2.
> I migrated this project in Kotlin : Kotlin JDK 8 with version 1.5.0.
> I used the 1.5.0 version of Kotlin because the 1.4.3 had an issue with Beam 
> and Maven plugin (Could not read class: VirtualFile : 
> [https://stackoverflow.com/questions/66170900/kotlin-1-4-30-apache-beam-compilation-error])
> Everything seems to be good except the use of native MapElement or 
> FlatMapElement with Typedescriptor and lambda expression.
> A part of my pom.xml file
> {code:xml}
> <properties>
>  <beam.version>2.27.0</beam.version>
> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>  <kotlin.code.style>official</kotlin.code.style>
>  <kotlin.compiler.jvmTarget>1.8</kotlin.compiler.jvmTarget>
>  <kotlin.compiler.incremental>true</kotlin.compiler.incremental>
> <kotlin.version>1.5.0</kotlin.version>
>  <serialization.version>1.2.0</serialization.version>
>  <java.version>1.8</java.version>
> <dagger.version>2.35.1</dagger.version>
>  <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
>  <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
>  <maven-surefire-plugin.version>3.0.0-M5</maven-surefire-plugin.version>
>  <properties>
> <dependencies>
>  <dependency>
>  <groupId>org.jetbrains.kotlin</groupId>
>  <artifactId>kotlin-stdlib-jdk8</artifactId>
>  <version>${kotlin.version}</version>
>  </dependency>
>  <dependency>
>  <groupId>org.jetbrains.kotlinx</groupId>
>  <artifactId>kotlinx-serialization-json</artifactId>
>  <version>${serialization.version}</version>
>  </dependency>
> <dependency>
>  <groupId>org.apache.beam</groupId>
>  <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
>  <version>${beam.version}</version>
>  <scope>runtime</scope>
>  </dependency>
> <dependency>
>  <groupId>org.apache.beam</groupId>
>  <artifactId>beam-sdks-java-core</artifactId>
>  <version>${beam.version}</version>
>  </dependency>
> <dependency>
>  <groupId>org.apache.beam</groupId>
>  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
>  <version>${beam.version}</version>
>  </dependency>
> <dependency>
>  <groupId>org.apache.beam</groupId>
>  <artifactId>beam-sdks-java-io-redis</artifactId>
>  <version>${beam.version}</version>
>  </dependency>
>  <dependency>
>  <groupId>org.jetbrains.kotlin</groupId>
>  <artifactId>kotlin-test-junit</artifactId>
>  <version>${kotlin.version}</version>
>  <scope>test</scope>
>  </dependency>
>  <dependencies>
> <build>
>  <plugins>
> <plugin>
>  <groupId>org.jetbrains.kotlin</groupId>
>  <artifactId>kotlin-maven-plugin</artifactId>
>  <version>${kotlin.version}</version>
>  <executions>
>  <execution>
>  <id>kapt</id>
>  <goals>
>  <goal>kapt</goal>
>  </goals>
>  <configuration>
>  <sourceDirs>
>  <sourceDir>src/main/kotlin</sourceDir>
>  </sourceDirs>
>  <annotationProcessorPaths>
>  <annotationProcessorPath>
>  <groupId>com.google.dagger</groupId>
>  <artifactId>dagger-compiler</artifactId>
>  <version>${dagger.version}</version>
>  </annotationProcessorPath>
>  </annotationProcessorPaths>
>  </configuration>
>  </execution>
>  <execution>
>  <id>compile</id>
>  <phase>process-sources</phase>
>  <goals>
>  <goal>compile</goal>
>  </goals>
>  <configuration>
>  <sourceDirs>
>  <sourceDir>src/main/kotlin</sourceDir>
>  </sourceDirs>
>  </configuration>
>  </execution>
> <execution>
>  <id>test-kapt</id>
>  <goals>
>  <goal>test-kapt</goal>
>  </goals>
>  <configuration>
>  <sourceDirs>
>  <sourceDir>src/test/kotlin</sourceDir>
>  </sourceDirs>
>  <annotationProcessorPaths>
>  <annotationProcessorPath>
>  <groupId>com.google.dagger</groupId>
>  <artifactId>dagger-compiler</artifactId>
>  <version>${dagger.version}</version>
>  </annotationProcessorPath>
>  </annotationProcessorPaths>
>  </configuration>
>  </execution>
>  <execution>
>  <id>test-compile</id>
>  <goals>
>  <goal>test-compile</goal>
>  </goals>
>  <configuration>
>  <sourceDirs>
>  <sourceDir>src/test/kotlin</sourceDir>
>  <sourceDir>target/generated-sources/kapt/test</sourceDir>
>  </sourceDirs>
>  </configuration>
>  </execution>
>  </executions>
>  <configuration>
>  <compilerPlugins>
>  <plugin>kotlinx-serialization</plugin>
>  </compilerPlugins>
>  </configuration>
>  <dependencies>
>  <dependency>
>  <groupId>org.jetbrains.kotlin</groupId>
>  <artifactId>kotlin-maven-serialization</artifactId>
>  <version>${kotlin.version}</version>
>  </dependency>
>  </dependencies>
>  </plugin>
> <plugin>
>  <groupId>org.apache.maven.plugins</groupId>
>  <artifactId>maven-surefire-plugin</artifactId>
>  <version>${maven-surefire-plugin.version}</version>
>  <dependencies>
>  <dependency>
>  <groupId>org.apache.maven.surefire</groupId>
>  <artifactId>surefire-junit47</artifactId>
>  <version>${maven-surefire-plugin.version}</version>
>  </dependency>
>  </dependencies>
>  </plugin>
> <plugin>
>  <groupId>org.codehaus.mojo</groupId>
>  <artifactId>exec-maven-plugin</artifactId>
>  <version>${maven-exec-plugin.version}</version>
>  <configuration>
>  <cleanupDaemonThreads>false</cleanupDaemonThreads>
>  </configuration>
>  </plugin>
>  </plugins>{code}
>  
> An object that implements Serializable (java.io)
> {code:java}
> data class MyObject(
>  val field: String = ""
>  ) : Serializable {{code}
> And basically i want to execute a FlatMapElement with Typedescriptor and a 
> lambda (behind the scene a SerializableFunction)
> {code:java}
> class MyTransform(private val redisConnectionConf: 
> RedisConnectionConfiguration) :
>  PTransform<PBegin, PCollection<MyObject>>() {
> override fun expand(input: PBegin): PCollection<MyObject>
> { return input 
> .apply(RedisIO.read().withConnectionConfiguration(redisConnectionConf).withKeyPattern("my-pattern*"))
>  .apply( FlatMapElements.into(of(MyObject::class.java)) 
> .via(SerializableFunction<KV<String, String>, List<MyObject>> \\{ 
> toMyObjects(it) }
> )
>  )
>  }
> fun toMyObjects(entry: KV<String, String>): List<MyObject> {
>  val key = entry.key
>  val value = entry.value
> val ref = object : TypeReference<List<MyObject>>() {}
>  return OBJECT_MAPPER.readValue(value, ref)
>  }{code}
> I volontary changed the code and put some part of code in method 
> "toMyObjects" in order to give the maximum of elements.
>  The "OBJECT_MAPPER" object is a Jackson Object Mapper.
> With Java 8 and Beam 2.27.0 this basic code works perfectly fine.
> With Kotlin this code doesn't works with the following error :
> {code:java}
> Unable to find source-code formatter for language: text. Available languages 
> are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, 
> groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, 
> perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, 
> yaml
>  at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray 
> (SerializableUtils.java:59)
>  at org.apache.beam.runners.core.construction.ParDoTranslation.translateDoFn 
> (ParDoTranslation.java:692)
>  at 
> org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator$1.translateDoFn
>  (PrimitiveParDoSingleFactory.java:218)
>  at 
> org.apache.beam.runners.core.construction.ParDoTranslation.payloadForParDoLike
>  (ParDoTranslation.java:814)
>  at 
> org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.payloadForParDoSingle
>  (PrimitiveParDoSingleFactory.java:214)
>  at 
> org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.translate
>  (PrimitiveParDoSingleFactory.java:163)
>  at 
> org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate
>  (PTransformTranslation.java:429)
>  at org.apache.beam.runners.core.construction.PTransformTranslation.toProto 
> (PTransformTranslation.java:239)
>  at 
> org.apache.beam.runners.core.construction.SdkComponents.registerPTransform 
> (SdkComponents.java:175)
>  at 
> org.apache.beam.runners.core.construction.PipelineTranslation$1.visitPrimitiveTransform
>  (PipelineTranslation.java:87)
>  at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:587)
>  at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:579)
>  at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:579)
>  at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit 
> (TransformHierarchy.java:579)
>  at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 
> (TransformHierarchy.java:239)
>  at org.apache.beam.sdk.runners.TransformHierarchy.visit 
> (TransformHierarchy.java:213)
>  at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
>  at org.apache.beam.runners.core.construction.PipelineTranslation.toProto 
> (PipelineTranslation.java:59)
>  at org.apache.beam.runners.dataflow.DataflowRunner.run 
> (DataflowRunner.java:933)
>  at org.apache.beam.runners.dataflow.DataflowRunner.run 
> (DataflowRunner.java:196)
>  at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
>  at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
>  at myPackage.MyApp.main (MyApp.kt:44)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
>  at sun.reflect.NativeMethodAccessorImpl.invoke 
> (NativeMethodAccessorImpl.java:62)
>  at sun.reflect.DelegatingMethodAccessorImpl.invoke 
> (DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke (Method.java:498)
>  at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
>  at java.lang.Thread.run (Thread.java:748)
>  Caused by: java.io.NotSerializableException: Non-serializable lambda
>  at mypackage.MyTransform$$Lambda$783/1784079343.writeObject (Unknown Source)
> [ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java 
> (default-cli) on project my-project: 
>  An exception occured while executing the Java class. unable to serialize 
>  
> DoFnWithExecutionInformation{doFn=org.apache.beam.sdk.transforms.FlatMapElements$2@23402e70,
>  
> mainOutputTag=Tag<org.apache.beam.sdk.values.PCollection.<init>:402#6929f09b03d242ca>,
>  sideInputMapping={}, 
> schemaInformation=DoFnSchemaInformation{elementConverters=[]}}: 
> Non-serializable lambda -> [Help 1]{code}
> The SerializableUtils.serializeToByteArray method in Beam sdk sends this 
> error : java.io.NotSerializableException: Non-serializable lambda
> MyObject is Serializable and the lambda is wrapped in a Beam 
> SerializableFunction (function that implements Serializable).
> Normally in this case, Beam take a SerializableCoder from the Serializable 
> object.
>  I don't understand why Beam saw the lambda as non Serializable.
> I don't have this kind of behaviour directly in Java.
> I precise, if i replace the FlatMapElement/descriptor/lambda by a 
> ParDo.of(DoFn), this works fine, but in some cases for a better concision and 
> readabilty, i want to use the built in MapElement and FlatMapElement with 
> lambda expressions.
> Thanks in advance for your help.
>  
> I did the same test with a little project and only the required dependencies 
> (Kotlin 1.5.0 and Beam 2.27.0 with Maven) and i have exactly the same issue.
> There is also a Stackoverflow link : 
> [https://stackoverflow.com/questions/67341499/flatmapelement-kotlin-beam-non-serializable-lambda]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to