Hi Beam,
We are running a beam pipeline on spark, whose pipeline is mainly like
this:
pipeline
.apply(AvroIO
.readGenericRecords(schema)
.from(// a direction)
.apply(ParDo.of(
new DoFn<GenericRecord, SomeType>() {
@DoFn.ProcessElement
public void process(ProcessContext c) {
// convert to another type
}
}))
.setCoder(..)
.apply(// write to a file);
It ran well before. But when we upgraded to beam 2.26. , the memory used by
the job increased a lot and we met the GC limit exception:
Container exited with a non-zero exit code 52. Error file: prelaunch.err.
13-01-2021 12:25:32 PST INFO - Last 4096 bytes of prelaunch.err :
13-01-2021 12:25:32 PST INFO - Last 4096 bytes of stderr :
13-01-2021 12:25:32 PST INFO -
e.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
13-01-2021 12:25:32 PST INFO - at
org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
13-01-2021 12:25:32 PST INFO - at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
13-01-2021 12:25:32 PST INFO - at
org.apache.spark.scheduler.Task.run(Task.scala:109)
13-01-2021 12:25:32 PST INFO - at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429)
13-01-2021 12:25:32 PST INFO - at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
13-01-2021 12:25:32 PST INFO - at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
13-01-2021 12:25:32 PST INFO - at java.lang.Thread.run(Thread.java:748)
13-01-2021 12:25:32 PST INFO - Caused by: java.lang.OutOfMemoryError: GC
overhead limit exceeded
13-01-2021 12:25:32 PST INFO - at
java.util.HashMap.newNode(HashMap.java:1747)
13-01-2021 12:25:32 PST INFO - at
java.util.HashMap.putVal(HashMap.java:642)
13-01-2021 12:25:32 PST INFO - at java.util.HashMap.put(HashMap.java:612)
13-01-2021 12:25:32 PST INFO - at
org.apache.avro.generic.GenericDatumReader.addToMap(GenericDatumReader.java:275)
13-01-2021 12:25:32 PST INFO - at
org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:256)
13-01-2021 12:25:32 PST INFO - at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
13-01-2021 12:25:32 PST INFO - at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
13-01-2021 12:25:32 PST INFO - at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
13-01-2021 12:25:32 PST INFO - at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
13-01-2021 12:25:32 PST INFO - at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
13-01-2021 12:25:32 PST INFO - at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
13-01-2021 12:25:32 PST INFO - at
org.apache.beam.sdk.io.AvroSource$AvroBlock.readNextRecord(AvroSource.java:647)
13-01-2021 12:25:32 PST INFO - at
org.apache.beam.sdk.io.BlockBasedSource$BlockBasedReader.readNextRecord(BlockBasedSource.java:212)
13-01-2021 12:25:32 PST INFO - at
org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:487)
13-01-2021 12:25:32 PST INFO - at
org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.advance(OffsetBasedSource.java:258)
13-01-2021 12:25:32 PST INFO - at
org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn$BoundedSourceAsSDFRestrictionTracker.tryClaim(Read.java:347)
13-01-2021 12:25:32 PST INFO - at
org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn$BoundedSourceAsSDFRestrictionTracker.tryClaim(Read.java:312)
13-01-2021 12:25:32 PST INFO - at
org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.tryClaim(RestrictionTrackers.java:59)
13-01-2021 12:25:32 PST INFO - at
org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn.processElement(Read.java:298)
13-01-2021 12:25:32 PST INFO - at
org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
13-01-2021 12:25:32 PST INFO - at
org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded$NaiveProcessFn.process(SplittableParDoNaiveBounded.java:309)
13-01-2021 12:25:32 PST INFO - at
org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded$NaiveProcessFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
13-01-2021 12:25:32 PST INFO - at
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
13-01-2021 12:25:32 PST INFO - at
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
13-01-2021 12:25:32 PST INFO - at
org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:65)
13-01-2021 12:25:32 PST INFO - at
org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:140)
13-01-2021 12:25:32 PST INFO - at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:141)
13-01-2021 12:25:32 PST INFO - at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:136)
13-01-2021 12:25:32 PST INFO - at
scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
13-01-2021 12:25:32 PST INFO - at
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
13-01-2021 12:25:32 PST INFO - at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
13-01-2021 12:25:32 PST INFO - at
scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
Before upgrading the beam version, the peak jvm memory usage was less than
1G. After upgrading, the peak jvm memory usage can be around 6G which is
too large for a simple job.
We temporarily fix this problem by setting
--experiments=use_deprecated_read, and guess `splitableDofn` is related to
this huge memory usage.
So we want to report this issue to you to see if we can have a better fix
to the huge memory increase issue.
Thank,
Yuhong