[
https://issues.apache.org/jira/browse/SPARK-17381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15459021#comment-15459021
]
Joao Duarte edited comment on SPARK-17381 at 9/2/16 4:49 PM:
-------------------------------------------------------------
Hi Sean.
Thanks for commenting. I set to Blocker because I can't run the app in a
production environment.
I am not explicitly adding accumulators in my code. I am trying to replicate
the issue with a simpler code such that I can post it here, but was unable to
do that so far.
Basically, for each row of the Dstream I collect a lot of data from different
sources, transform the data into a dataframe and run some ML Pipelines. Again,
I don't explicitly collect data or use accumulators. I started stripping down
the code by removing all ML Pipelines and now I only have some functions that I
run in RDD maps to collect the data I need (so, it runs in the executors),
transform that data, create a dataframe and the perform an aggregation (just a
dummy sum of a column). When I inspect the drivers heap, some data I loaded in
the executors are there.
The driver's heap "path" that contains the "unwanted" data is
objSQLTaskMetrics.accumulatorUpdates[2]._2 where:
- objSQLTaskMetrics is one of the several SQLTaskMetrics
- _2 class is Collections$UnmodifiableRandomAccessList with size 1 which
contains the GenericInternalRow with the "unwanted" data.
was (Author: joaomaiaduarte):
Hi Sean.
Thanks for commenting. I set to Blocker because I can't run the app in a
production environment.
I am not explicitly adding accumulators in my code. I am trying to replicate
the issue with a simpler code such that I can post it here, but was unable to
do that so far.
Basically, for each row of the Dstream I collect a lot of data from different
sources, transform the data into a dataframe and run some ML Pipelines. Again,
I don't explicitly collect data or use accumulators. I started stripping down
the code by removing all ML Pipelines and now I only have some functions that I
run in RDD maps to collect the data I need (so, it runs in the executors),
transform that data, create a dataframe and the perform an aggregation (just a
dummy sum of a column). When I inspect the drivers heap, some data I loaded in
the executors are there.
The driver's heap "path" that contains the "unwanted" data is
objSQLTaskMetrics.accumulatorUpdates[2]._2 where:
-objSQLTaskMetrics is one of the several SQLTaskMetrics
- _2 class is Collections$UnmodifiableRandomAccessList with size 1 which
contains the GenericInternalRow with the "unwanted" data.
> Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics
> -------------------------------------------------------------
>
> Key: SPARK-17381
> URL: https://issues.apache.org/jira/browse/SPARK-17381
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.0.0
> Environment: EMR 5.0.0 (submitted as yarn-client)
> Java Version 1.8.0_101 (Oracle Corporation)
> Scala Version version 2.11.8
> Problem also happens when I run locally with similar versions of java/scala.
> OS: Ubuntu 16.04
> Reporter: Joao Duarte
>
> I am running a Spark Streaming application from a Kinesis stream. After some
> hours running it gets out of memory. After a driver heap dump I found two
> problems:
> 1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems
> this was a problem before:
> https://issues.apache.org/jira/browse/SPARK-11192);
> To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just
> needed to run the code below:
> {code}
> val dstream = ssc.union(kinesisStreams)
> dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => {
> //load data
> val toyDF = streamInfo.map(_ =>
> (1, "data","more data "
> ))
> .toDF("Num", "Data", "MoreData" )
> toyDF.agg(sum("Num")).first().get(0)
> }
> )
> {code}
> 2) huge amount of Array[Byte] (9Gb+)
> After some analysis, I noticed that most of the Array[Byte] where being
> referenced by objects that were being referenced by SQLTaskMetrics. The
> strangest thing is that those Array[Byte] were basically text that were
> loaded in the executors, so they should never be in the driver at all!
> Still could not replicate the 2nd problem with a simple code (the original
> was complex with data coming from S3, DynamoDB and other databases). However,
> when I debug the application I can see that in Executor.scala, during
> reportHeartBeat(), the data that should not be sent to the driver is being
> added to "accumUpdates" which, as I understand, will be sent to the driver
> for reporting.
> To be more precise, one of the taskRunner in the loop "for (taskRunner <-
> runningTasks.values().asScala)" contains a GenericInternalRow with a lot of
> data that should not go to the driver. The path would be in my case:
> taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if
> not the same) to the data I see when I do a driver heap dump.
> I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is
> fixed I would have less of this undesirable data in the driver and I could
> run my streaming app for a long period of time, but I think there will always
> be some performance lost.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]