[
https://issues.apache.org/jira/browse/BEAM-14099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508989#comment-17508989
]
Alexey Romanenko edited comment on BEAM-14099 at 3/18/22, 6:06 PM:
-------------------------------------------------------------------
Actually, this issue is caused by the way how Apache Spark handles the lineage
of its pipelines. To make it fault tolerant, the driver process keeps all DAG
in stack and it may take quite significant amount of stack memory for large
pipelines with many transforms that don't materialise or checkpointing the
results in the middle. So, it's mostly not a Beam Spark Runner problem rather
than just a Spark particular feature.
Some workarounds can be used to overcome this issue:
- increase "-Xss" option for Spark driver process
- materialise results from time to time with an empty side input and
{{Reshuffle}}, like it's done in
[JdbcIO.Reparallelize|https://github.com/apache/beam/blob/d7f7f8e587a56fbc7fbfbdd8b010a8f6991234ee/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L2084]
was (Author: aromanenko):
Actually, this issue is caused by the way how Apache Spark handles the lineage
of its pipelines. To make it fault tolerant, the driver process keeps all DAG
in stack and it may take quite significant amount of memory for large pipelines
with many transforms that don't materialise or checkpointing the results. So,
it's mostly not a Beam Spark Runner problem rather than just a Spark particular
feature.
Some workarounds can be used to overcome this issue:
- increase "-Xss" option for Spark driver process
- materialise results from time to time with an empty side input and
{{Reshuffle}}, like it's done in
[JdbcIO.Reparallelize|https://github.com/apache/beam/blob/d7f7f8e587a56fbc7fbfbdd8b010a8f6991234ee/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L2084]
> Pipeline with large number of PTransforms fails with StackOverflowError
> ------------------------------------------------------------------------
>
> Key: BEAM-14099
> URL: https://issues.apache.org/jira/browse/BEAM-14099
> Project: Beam
> Issue Type: Bug
> Components: runner-spark, sdk-java-core
> Affects Versions: 2.37.0
> Reporter: Alexey Romanenko
> Assignee: Alexey Romanenko
> Priority: P1
> Fix For: Not applicable
>
> Attachments: BEAM-14099_spark.log
>
>
> If pipeline, written in Java SDK, contains a large number of PTransforms then
> it fails with a {{java.lang.StackOverflowError}}
> Code snippet to reproduce (based on WordCount example):
> {code}
> public class WordCountWithNFilters {
> private static final int N = 100;
> public static void main(String[] args) {
> PipelineOptions options =
> PipelineOptionsFactory.fromArgs(args).withValidation().create();
> Pipeline p = Pipeline.create(options);
> PCollection<String> words =
> p.apply(TextIO.read().from("file://tmp/input.txt"))
> .apply(
> FlatMapElements.into(TypeDescriptors.strings())
> .via((String line) ->
> Arrays.asList(line.split("[^\\p{L}]+"))));
> for (int i = 0; i < N; i++) {
> words = words.apply(Filter.by((String word) -> !word.isEmpty()));
> }
> words.apply(Count.perElement())
> .apply(
> MapElements.into(TypeDescriptors.strings())
> .via(
> (KV<String, Long> wordCount) ->
> wordCount.getKey() + ": " + wordCount.getValue()))
> .apply(TextIO.write().to("wordcounts"));
> p.run().waitUntilFinish();
> }
> }
> {code}
> Log while running with SparkRunner:
> {code}
> 2022-03-14 19:01:30,465 [pool-3-thread-1] INFO
> org.apache.beam.runners.spark.SparkRunner$Evaluator - Evaluating
> View.CreatePCollectionView
> [WARNING]
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.StackOverflowError
> at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom
> (SparkPipelineResult.java:73)
> at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish
> (SparkPipelineResult.java:104)
> at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish
> (SparkPipelineResult.java:92)
> at org.apache.beam.samples.sql.WordCountWithNFilters.main
> (WordCountWithNFilters.java:39)
> at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:254)
> at java.lang.Thread.run (Thread.java:748)
> Caused by: java.lang.StackOverflowError
> at java.lang.ReflectiveOperationException.<init>
> (ReflectiveOperationException.java:89)
> at java.lang.reflect.InvocationTargetException.<init>
> (InvocationTargetException.java:72)
> at sun.reflect.GeneratedMethodAccessor39.invoke (Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke
> (DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke (Method.java:498)
> at java.io.ObjectStreamClass.invokeWriteReplace
> (ObjectStreamClass.java:1244)
> at java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1136)
> at java.io.ObjectOutputStream.defaultWriteFields
> (ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData
> (ObjectOutputStream.java:1509)
> at java.io.ObjectOutputStream.writeOrdinaryObject
> (ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.defaultWriteFields
> (ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData
> (ObjectOutputStream.java:1509)
> at java.io.ObjectOutputStream.writeOrdinaryObject
> (ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject (ObjectOutputStream.java:348)
> at scala.collection.immutable.List$SerializationProxy.writeObject
> (List.scala:479)
> at sun.reflect.GeneratedMethodAccessor40.invoke (Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke
> (DelegatingMethodAccessorImpl.java:43)
> ...
> {code}
> It seems that {{N}} depends on environment configuration.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)