[
https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618098#comment-16618098
]
Sandish Kumar HN commented on SPARK-20760:
------------------------------------------
I do see the issue in spark 2.1.1 & 2.2.0 and I was able to replicate the issue
with above code snippets.
> Memory Leak of RDD blocks
> --------------------------
>
> Key: SPARK-20760
> URL: https://issues.apache.org/jira/browse/SPARK-20760
> Project: Spark
> Issue Type: Bug
> Components: Block Manager
> Affects Versions: 2.1.0
> Environment: Spark 2.1.0
> Reporter: Binzi Cao
> Priority: Major
> Attachments: RDD Blocks .png, RDD blocks in spark 2.1.1.png, Storage
> in spark 2.1.1.png
>
>
> Memory leak for RDD blocks for a long time running rdd process.
> We have a long term running application, which is doing computations of
> RDDs. and we found the RDD blocks are keep increasing in the spark ui page.
> The rdd blocks and memory usage do not mach the cached rdds and memory. It
> looks like spark keeps old rdd in memory and never released it or never got a
> chance to release it. The job will eventually die of out of memory.
> In addition, I'm not seeing this issue in spark 1.6. We are seeing the same
> issue in Yarn Cluster mode both in kafka streaming and batch applications.
> The issue in streaming is similar, however, it seems the rdd blocks grows a
> bit slower than batch jobs.
> The below is the sample code and it is reproducible by justing running it in
> local mode.
> Scala file:
> {code}
> import scala.concurrent.duration.Duration
> import scala.util.{Try, Failure, Success}
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.rdd.RDD
> import scala.concurrent._
> import ExecutionContext.Implicits.global
> case class Person(id: String, name: String)
> object RDDApp {
> def run(sc: SparkContext) = {
> while (true) {
> val r = scala.util.Random
> val data = (1 to r.nextInt(100)).toList.map { a =>
> Person(a.toString, a.toString)
> }
> val rdd = sc.parallelize(data)
> rdd.cache
> println("running")
> val a = (1 to 100).toList.map { x =>
> Future(rdd.filter(_.id == x.toString).collect)
> }
> a.foreach { f =>
> println(Await.ready(f, Duration.Inf).value.get)
> }
> rdd.unpersist()
> }
> }
> def main(args: Array[String]): Unit = {
> val conf = new SparkConf().setAppName("test")
> val sc = new SparkContext(conf)
> run(sc)
> }
> }
> {code}
> build sbt file:
> {code}
> name := "RDDTest"
> version := "0.1.1"
> scalaVersion := "2.11.5"
> libraryDependencies ++= Seq (
> "org.scalaz" %% "scalaz-core" % "7.2.0",
> "org.scalaz" %% "scalaz-concurrent" % "7.2.0",
> "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
> "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
> )
> addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
> mainClass in assembly := Some("RDDApp")
> test in assembly := {}
> {code}
> To reproduce it:
> Just
> {code}
> spark-2.1.0-bin-hadoop2.7/bin/spark-submit --driver-memory 4G \
> --executor-memory 4G \
> --executor-cores 1 \
> --num-executors 1 \
> --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]