Vitali Makarevich created SPARK-44459:
-----------------------------------------
Summary: Garbage collection doesn't include finalization run
Key: SPARK-44459
URL: https://issues.apache.org/jira/browse/SPARK-44459
Project: Spark
Issue Type: Improvement
Components: Structured Streaming, Web UI
Affects Versions: 3.3.0
Environment: AWS EMR 6.9
Hudi 0.12.1
Spark 3.3.0
Reporter: Vitali Makarevich
{panel:title=Problem description}
Full text with figures available [here(4 min
read)|https://medium.com/@vitaliy.makarevich.work/spark-structured-streaming-and-java-util-zip-and-finalize-method-83181c6bc86f]
, I can post it here as well, but will post a shorter version.
When running a relatively big application(dozens of streams in parallel), Spark
driver is growing in memory up to 110GBs(at this moment I was stopping the
test). When I check the heapdump/JMX finalization queue size, I see it's
struggling with accumulating java.lang.ref.Finalizer and underlying objects in
them. Most of the objects in finalize queue are from java.util.zip package.
{panel}
{panel:title=Underlying Java implementation}
In a nutshell, there is a Java 8 finalizer method in Object. If it's not empty
when the object is garbage collected, it's not removed once found unused but
put in the Finalizer queue. Then JVM runs the Finalizer thread which takes each
object from the queue and runs `finalize`. The problem is for big applications,
the finalizer queue grows incomparably with finalization frequency/thread
priority.
Very frequently, zip package instances are referring to C memory(since it's
implemented in Native way), so even native memory is not being cleaned until
`finalize` is called.
{panel}
{panel:title=Application}
As for the application I caught it - it runs 90 streaming queries in parallel
with a batch frequency of about 1 hour. The application is reading data from
[Apache Hudi|https://hudi.apache.org/] and writes output to another path in
Apache Hudi(0.12.1 version). It's running on AWS EMR 6.9 on Java 8.
Spark UI/Event log is enabled with settings
{code:java}
"spark.ui.enabled" = "true"
## How many jobs the Spark UI and status APIs remember before garbage
collecting.
## This is a target maximum, and fewer elements may be retained in some
circumstances.
## Default value: 1000
"spark.ui.retainedJobs" = "100" ## How many stages the Spark UI and
status APIs remember before garbage collecting.
## This is a target maximum, and fewer elements may be retained in some
circumstances.
## Default value: 1000
"spark.ui.retainedStages" = "50" ## How many tasks in one stage the
Spark UI and status APIs remember before garbage collecting.
## This is a target maximum, and fewer elements may be retained in some
circumstances.
## Default value: 100000
"spark.ui.retainedTasks" = "50" ## How many DAG graph nodes the Spark UI
and status APIs remember before garbage collecting.
## Default value: Int.MaxValue (2^31) - Here we use 2^15 instead.
"spark.ui.dagGraph.retainedRootRDDs" = "32768"
"spark.worker.ui.retainedExecutors" = "10"
"spark.worker.ui.retainedDrivers" = "10"
"spark.sql.ui.retainedExecutions" = "10"
"spark.streaming.ui.retainedBatches" = "10" "spark.eventLog.enabled":
"true"
"spark.eventLog.rotation.enabled" : "true",
"spark.eventLog.rotation.interval" : "3600",
"spark.eventLog.rotation.minFileSize" : "1024m",
"spark.eventLog.rotation.maxFilesToRetain" : "5" {code}
Spark UI is a crucial part since without it(with disabled), memory consumption
is fine. I've played around with it, unfortunately even with very conservative
settings it doesn't work well.
{panel}
{panel:title=What is the inner source of the issue}
I'm not sure about the source of the issue, but it looks like the Driver is
heavily using zip package for small data. I assume it's coming from some
networking where traffic is compressed(I saw some java.util.zip instances
coming from Netty, but once they are GCed I could not track back the source
since it's referenced only be FinalizerQueue).
{panel}
{panel:title=Proposed solution}
As a workaround, I've added a background service that runs
`System.runFinalization()` with the same frequency as
`spark.cleaner.periodicGC.interval` and it works well, memory consumption stays
stable at an acceptable level(from indefinite growth to >100GB it stays at
60-70 GB total heap(~40GB used) which I consider ok for such intensive
application).
So proposed solution is to add
[here|https://github.com/apache/spark/blob/85d8d62216d3b830cc5af3dec05422a9cda4cea0/core/src/main/scala/org/apache/spark/ContextCleaner.scala#L131]
a `System.runFinalization()` call. I don't think there is any drawback related
to it(like reduced performance or so). But it may be added as a separate
service like the current `System.gc` or under a feature flag for compatibility
as well.
{panel}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]