pwiecek opened a new issue, #25608:
URL: https://github.com/apache/beam/issues/25608
### What happened?
Hello,
I wanted to run a simple Beam batch pipeline as a Spark job on a Dataproc
cluster. Unfortunately, there is a problem with lambda deserialization:
```java
23/02/23 11:50:15 INFO SparkRunner$Evaluator: Evaluating
org.apache.beam.sdk.transforms.View$ToListViewDoFn@14b48f39
23/02/23 11:50:15 INFO SparkRunner$Evaluator: Evaluating
View.CreatePCollectionView
23/02/23 11:50:20 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1)
(beam-spark-poc-XXXXXXXXXXXXXXXXXXXXXXXXXXXX executor 1): java.io.IOException:
unexpected exception type
at
java.base/java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1512)
at
java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1142)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2237)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
at
java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2134)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1675)
at
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
at
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
at
java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2134)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1675)
at
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
at
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
at
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
at
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
at
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
at
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
at
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
at
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:85)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.reflect.InvocationTargetException
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
java.base/java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:237)
at
java.base/jdk.internal.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1136)
... 35 more
Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
at
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions.$deserializeLambda$(GroupNonMergingWindowsFunctions.java:50)
... 44 more
```
Test job source code:
```java
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(TextIO.read().from("gs://<FILE_PATH>"))
.apply("Do something", ParDo.of(new MessageConverterFn()))
.apply(TextIO.write().to("wordcounts"));
```
```java
public static class MessageConverterFn extends DoFn<String, String> {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
String line = c.element();
c.output(line);
}
}
```
Beam version: 2.45.0
Spark version: 3.3.0
Java version: 11
Dataproc image version: 2.1.4-debian11 (Spark 3.3.0, Java 11)
When I replace `TextIO.write()` with simple logging to screen the job works.
I don't know where this problem comes from. Java and Spark versions match.
```java
.apply(Println.<String>stdout());
```
```java
public static <T> Println<T> stdout() {
return new Println<>(new DoFn<T, Void>() {
private static final long serialVersionUID =
-313060014379406773L;
@ProcessElement
public void processElement(@Element T element) {
System.out.println(element);
}
});
}
```
Spark submit command:
```bash
spark-submit --class <MY_CLASS> --master yarn --deploy-mode client --jars
gs://<MY_JAR>.jar gs://<MY_JAR>.jar --runner=SparkRunner
```
Of course the same code works within the IDE and on the Spark cluster
running on the laptop.
Any ideas?
### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [ ] Component: Python SDK
- [X] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [X] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]