pwiecek opened a new issue, #25608:
URL: https://github.com/apache/beam/issues/25608

   ### What happened?
   
   Hello,
   
   I wanted to run a simple Beam batch pipeline as a Spark job on a Dataproc 
cluster.  Unfortunately, there is a problem with lambda deserialization:
   
   ```java
   23/02/23 11:50:15 INFO SparkRunner$Evaluator: Evaluating 
org.apache.beam.sdk.transforms.View$ToListViewDoFn@14b48f39
   23/02/23 11:50:15 INFO SparkRunner$Evaluator: Evaluating 
View.CreatePCollectionView
   23/02/23 11:50:20 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1) 
(beam-spark-poc-XXXXXXXXXXXXXXXXXXXXXXXXXXXX executor 1): java.io.IOException: 
unexpected exception type
           at 
java.base/java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1512)
           at 
java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1142)
           at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2237)
           at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
           at 
java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2134)
           at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1675)
           at 
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
           at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
           at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
           at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
           at 
java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2134)
           at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1675)
           at 
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
           at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
           at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
           at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
           at 
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
           at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
           at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
           at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
           at 
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
           at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
           at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
           at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
           at 
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
           at 
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
           at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
           at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
           at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:85)
           at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
           at org.apache.spark.scheduler.Task.run(Task.scala:136)
           at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
           at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
           at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: java.lang.reflect.InvocationTargetException
           at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.base/java.lang.reflect.Method.invoke(Method.java:566)
           at 
java.base/java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:237)
           at 
java.base/jdk.internal.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
           at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.base/java.lang.reflect.Method.invoke(Method.java:566)
           at 
java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1136)
           ... 35 more
   Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
           at 
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions.$deserializeLambda$(GroupNonMergingWindowsFunctions.java:50)
           ... 44 more
   
   ```
   
   Test job source code:
   
   ```java
   Pipeline pipeline = Pipeline.create(options);
   
   pipeline
   .apply(TextIO.read().from("gs://<FILE_PATH>"))
   .apply("Do something", ParDo.of(new MessageConverterFn()))
   .apply(TextIO.write().to("wordcounts"));
   ```
   
   ```java
   public static class MessageConverterFn extends DoFn<String, String> {
   
                private static final long serialVersionUID = 1L;
                
                @ProcessElement
                public void processElement(ProcessContext c) {
                        String line = c.element();
                        c.output(line);
                }
        }
   ```
   
   Beam version: 2.45.0
   Spark version: 3.3.0
   Java version: 11
   Dataproc image version: 2.1.4-debian11 (Spark 3.3.0, Java 11)
   
   When I replace `TextIO.write()` with simple logging to screen the job works. 
I don't know where this problem comes from. Java and Spark versions match.
   
   ```java
   .apply(Println.<String>stdout());
   ```
   
   ```java
   public static <T> Println<T> stdout() {
                return new Println<>(new DoFn<T, Void>() {
   
                        private static final long serialVersionUID = 
-313060014379406773L;
   
                        @ProcessElement
                        public void processElement(@Element T element) {
                                System.out.println(element);
                        }
                });
        }
   ```
   
   Spark submit command:
   
   ```bash
   spark-submit --class <MY_CLASS>  --master yarn --deploy-mode client --jars 
gs://<MY_JAR>.jar   gs://<MY_JAR>.jar --runner=SparkRunner
   ```
   
   Of course the same code works within the IDE and on the Spark cluster 
running on the laptop.
   Any ideas?
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [X] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [X] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to