[ 
https://issues.apache.org/jira/browse/BEAM-14099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508989#comment-17508989
 ] 

Alexey Romanenko edited comment on BEAM-14099 at 3/18/22, 6:06 PM:
-------------------------------------------------------------------

Actually, this issue is caused by the way how Apache Spark handles the lineage 
of its pipelines. To make it fault tolerant, the driver process keeps all DAG 
in stack and it may take quite significant amount of stack memory for large 
pipelines with many transforms that don't materialise or checkpointing the 
results in the middle. So, it's mostly not a Beam Spark Runner problem rather 
than just a Spark particular feature.

Some workarounds can be used to overcome this issue:
- increase "-Xss" option for Spark driver process 
- materialise results from time to time with an empty side input and 
{{Reshuffle}}, like it's done in 
[JdbcIO.Reparallelize|https://github.com/apache/beam/blob/d7f7f8e587a56fbc7fbfbdd8b010a8f6991234ee/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L2084]


was (Author: aromanenko):
Actually, this issue is caused by the way how Apache Spark handles the lineage 
of its pipelines. To make it fault tolerant, the driver process keeps all DAG 
in stack and it may take quite significant amount of memory for large pipelines 
with many transforms that don't materialise or checkpointing the results. So, 
it's mostly not a Beam Spark Runner problem rather than just a Spark particular 
feature.

Some workarounds can be used to overcome this issue:
- increase "-Xss" option for Spark driver process 
- materialise results from time to time with an empty side input and 
{{Reshuffle}}, like it's done in 
[JdbcIO.Reparallelize|https://github.com/apache/beam/blob/d7f7f8e587a56fbc7fbfbdd8b010a8f6991234ee/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L2084]

> Pipeline with large number of PTransforms fails with StackOverflowError 
> ------------------------------------------------------------------------
>
>                 Key: BEAM-14099
>                 URL: https://issues.apache.org/jira/browse/BEAM-14099
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark, sdk-java-core
>    Affects Versions: 2.37.0
>            Reporter: Alexey Romanenko
>            Assignee: Alexey Romanenko
>            Priority: P1
>             Fix For: Not applicable
>
>         Attachments: BEAM-14099_spark.log
>
>
> If pipeline, written in Java SDK, contains a large number of PTransforms then 
> it fails with a  {{java.lang.StackOverflowError}}
> Code snippet to reproduce (based on WordCount example):
> {code}
> public class WordCountWithNFilters {
>   private static final int N = 100;
>   public static void main(String[] args) {
>     PipelineOptions options = 
> PipelineOptionsFactory.fromArgs(args).withValidation().create();
>     Pipeline p = Pipeline.create(options);
>     PCollection<String> words = 
> p.apply(TextIO.read().from("file://tmp/input.txt"))
>         .apply(
>             FlatMapElements.into(TypeDescriptors.strings())
>                 .via((String line) -> 
> Arrays.asList(line.split("[^\\p{L}]+"))));
>     for (int i = 0; i < N; i++) {
>       words = words.apply(Filter.by((String word) -> !word.isEmpty()));
>     }
>     words.apply(Count.perElement())
>         .apply(
>             MapElements.into(TypeDescriptors.strings())
>                 .via(
>                     (KV<String, Long> wordCount) ->
>                         wordCount.getKey() + ": " + wordCount.getValue()))
>         .apply(TextIO.write().to("wordcounts"));
>     p.run().waitUntilFinish();
>   }
> }
> {code}
> Log while running with SparkRunner:
> {code}
> 2022-03-14 19:01:30,465 [pool-3-thread-1] INFO  
> org.apache.beam.runners.spark.SparkRunner$Evaluator  - Evaluating 
> View.CreatePCollectionView
> [WARNING] 
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> java.lang.StackOverflowError
>     at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom 
> (SparkPipelineResult.java:73)
>     at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish 
> (SparkPipelineResult.java:104)
>     at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish 
> (SparkPipelineResult.java:92)
>     at org.apache.beam.samples.sql.WordCountWithNFilters.main 
> (WordCountWithNFilters.java:39)
>     at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:254)
>     at java.lang.Thread.run (Thread.java:748)
> Caused by: java.lang.StackOverflowError
>     at java.lang.ReflectiveOperationException.<init> 
> (ReflectiveOperationException.java:89)
>     at java.lang.reflect.InvocationTargetException.<init> 
> (InvocationTargetException.java:72)
>     at sun.reflect.GeneratedMethodAccessor39.invoke (Unknown Source)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke 
> (DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke (Method.java:498)
>     at java.io.ObjectStreamClass.invokeWriteReplace 
> (ObjectStreamClass.java:1244)
>     at java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1136)
>     at java.io.ObjectOutputStream.defaultWriteFields 
> (ObjectOutputStream.java:1548)
>     at java.io.ObjectOutputStream.writeSerialData 
> (ObjectOutputStream.java:1509)
>     at java.io.ObjectOutputStream.writeOrdinaryObject 
> (ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.defaultWriteFields 
> (ObjectOutputStream.java:1548)
>     at java.io.ObjectOutputStream.writeSerialData 
> (ObjectOutputStream.java:1509)
>     at java.io.ObjectOutputStream.writeOrdinaryObject 
> (ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.writeObject (ObjectOutputStream.java:348)
>     at scala.collection.immutable.List$SerializationProxy.writeObject 
> (List.scala:479)
>     at sun.reflect.GeneratedMethodAccessor40.invoke (Unknown Source)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke 
> (DelegatingMethodAccessorImpl.java:43)
> ...
> {code}
> It seems that {{N}} depends on environment configuration.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to