[ https://issues.apache.org/jira/browse/BEAM-6517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16754873#comment-16754873 ]
Etienne Chauchot commented on BEAM-6517: ---------------------------------------- [~vho] what happens if you test on Flink and without the shade (as it is not needed) - careful fat jar shades libraries, test it in local mode -. If it is related to shade and not spark [~iemejia] is better suited than me as he know shades a lot better than me. > Pipeline fails with deserializing lambda function on Spark (MapElements) > ------------------------------------------------------------------------ > > Key: BEAM-6517 > URL: https://issues.apache.org/jira/browse/BEAM-6517 > Project: Beam > Issue Type: Bug > Components: runner-spark > Affects Versions: 2.9.0 > Reporter: Vu Ho > Assignee: Vu Ho > Priority: Minor > > I'm trying to read from Parquet using Spark runner. Initial attempt failed > because of version mismatch (Spark 2.3.2 use older Parquet). I tried shading > parquet and avro, and it successfully read the Parquet record. However when I > tried to access the record field using lambda function: > {code:java} > p.apply(FileIO.match().filepattern(options.getInputFile())) > .apply(FileIO.readMatches()) > .apply(ParquetIO.readFiles(schema)) > .apply(MapElements.into(TypeDescriptors.strings()).via(it -> > it.get("name").toString())) > .apply(TextIO.write().to(options.getOutput()));{code} > > {code:java} > Exception in thread "main" java.lang.IllegalArgumentException: unable to > deserialize org.apache.beam.sdk.transforms.MapElements$1@1292071f > at > org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) > at > org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:100) > at > org.apache.beam.runners.spark.translation.MultiDoFnFunction.<init>(MultiDoFnFunction.java:103) > at > org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:374) > at > org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:340) > at > org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:438) > at > org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:426) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311) > at > org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245) > at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458) > at > org.apache.beam.runners.spark.SparkRunner.lambda$run$1(SparkRunner.java:223) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: unexpected exception type > at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736) > at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) > at > org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:71) > ... 19 more > Caused by: java.lang.reflect.InvocationTargetException > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260) > ... 41 more > Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization > at > beamtest.examples.ParquetWordCount.$deserializeLambda$(ParquetWordCount.java:23) > ... 51 more{code} > > This doesn't happen if I use SimpleFunction, but many times lambda functions > are more convenient and readable -- This message was sent by Atlassian JIRA (v7.6.3#76005)