[ 
https://issues.apache.org/jira/browse/BEAM-6517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16754873#comment-16754873
 ] 

Etienne Chauchot commented on BEAM-6517:
----------------------------------------

[~vho]  what happens if you test on Flink and without the shade (as it is not 
needed) - careful fat jar shades libraries, test it in local mode -. If it is 
related to shade and not spark [~iemejia] is better suited than me as he know 
shades a lot better than me.

> Pipeline fails with deserializing lambda function on Spark (MapElements)
> ------------------------------------------------------------------------
>
>                 Key: BEAM-6517
>                 URL: https://issues.apache.org/jira/browse/BEAM-6517
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>    Affects Versions: 2.9.0
>            Reporter: Vu Ho
>            Assignee: Vu Ho
>            Priority: Minor
>
> I'm trying to read from Parquet using Spark runner. Initial attempt failed 
> because of version mismatch (Spark 2.3.2 use older Parquet). I tried shading 
> parquet and avro, and it successfully read the Parquet record. However when I 
> tried to access the record field using lambda function:
> {code:java}
> p.apply(FileIO.match().filepattern(options.getInputFile()))
>   .apply(FileIO.readMatches())
>   .apply(ParquetIO.readFiles(schema))
>   .apply(MapElements.into(TypeDescriptors.strings()).via(it -> 
> it.get("name").toString()))
>   .apply(TextIO.write().to(options.getOutput()));{code}
>  
> {code:java}
> Exception in thread "main" java.lang.IllegalArgumentException: unable to 
> deserialize org.apache.beam.sdk.transforms.MapElements$1@1292071f
> at 
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
> at 
> org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:100)
> at 
> org.apache.beam.runners.spark.translation.MultiDoFnFunction.<init>(MultiDoFnFunction.java:103)
> at 
> org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:374)
> at 
> org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:340)
> at 
> org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:438)
> at 
> org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:426)
> at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649)
> at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649)
> at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649)
> at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
> at 
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
> at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
> at 
> org.apache.beam.runners.spark.SparkRunner.lambda$run$1(SparkRunner.java:223)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: unexpected exception type
> at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736)
> at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
> at 
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:71)
> ... 19 more
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260)
> ... 41 more
> Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
> at 
> beamtest.examples.ParquetWordCount.$deserializeLambda$(ParquetWordCount.java:23)
> ... 51 more{code}
>  
> This doesn't happen if I use SimpleFunction, but many times lambda functions 
> are more convenient and readable 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to