[ 
https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16034206#comment-16034206
 ] 

Patrick Brown commented on SPARK-20760:
---------------------------------------

We have been having the same issue in a spark 2.0.2 cluster on yarn for a while 
now.

> Memory Leak of RDD blocks 
> --------------------------
>
>                 Key: SPARK-20760
>                 URL: https://issues.apache.org/jira/browse/SPARK-20760
>             Project: Spark
>          Issue Type: Bug
>          Components: Block Manager
>    Affects Versions: 2.1.0
>         Environment: Spark 2.1.0
>            Reporter: Binzi Cao
>         Attachments: RDD Blocks .png
>
>
> Memory leak for RDD blocks for a long time running rdd process.
> We  have a long term running application, which is doing computations of 
> RDDs. and we found the RDD blocks are keep increasing in the spark ui page. 
> The rdd blocks and memory usage do not mach the cached rdds and memory. It 
> looks like spark keeps old rdd in memory and never released it or never got a 
> chance to release it. The job will eventually die of out of memory. 
> In addition, I'm not seeing this issue in spark 1.6. We are seeing the same 
> issue in Yarn Cluster mode both in kafka streaming and batch applications. 
> The issue in streaming is similar, however, it seems the rdd blocks grows a 
> bit slower than batch jobs. 
> The below is the sample code and it is reproducible by justing running it in 
> local mode. 
> Scala file:
> {code}
> import scala.concurrent.duration.Duration
> import scala.util.{Try, Failure, Success}
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.rdd.RDD
> import scala.concurrent._
> import ExecutionContext.Implicits.global
> case class Person(id: String, name: String)
> object RDDApp {
>   def run(sc: SparkContext) = {
>     while (true) {
>       val r = scala.util.Random
>       val data = (1 to r.nextInt(100)).toList.map { a =>
>         Person(a.toString, a.toString)
>       }
>       val rdd = sc.parallelize(data)
>       rdd.cache
>       println("running")
>       val a = (1 to 100).toList.map { x =>
>         Future(rdd.filter(_.id == x.toString).collect)
>       }
>       a.foreach { f =>
>         println(Await.ready(f, Duration.Inf).value.get)
>       }
>       rdd.unpersist()
>     }
>   }
>   def main(args: Array[String]): Unit = {
>    val conf = new SparkConf().setAppName("test")
>     val sc   = new SparkContext(conf)
>     run(sc)
>   }
> }
> {code}
> build sbt file:
> {code}
> name := "RDDTest"
> version := "0.1.1"
> scalaVersion := "2.11.5"
> libraryDependencies ++= Seq (
>     "org.scalaz" %% "scalaz-core" % "7.2.0",
>     "org.scalaz" %% "scalaz-concurrent" % "7.2.0",
>     "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
>     "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
>   )
> addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
> mainClass in assembly := Some("RDDApp")
> test in assembly := {}
> {code}
> To reproduce it: 
> Just 
> {code}
> spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
> --executor-memory 4G \
> --executor-cores 1 \
> --num-executors 1 \
> --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to