[ 
https://issues.apache.org/jira/browse/SPARK-17888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15571476#comment-15571476
 ] 

Sean Owen commented on SPARK-17888:
-----------------------------------

What makes you believe there is a leak? if you're not running out of memory, I 
don't think there is an issue here.

> Memory leak in streaming driver when use SparkSQL in Streaming
> --------------------------------------------------------------
>
>                 Key: SPARK-17888
>                 URL: https://issues.apache.org/jira/browse/SPARK-17888
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.6.2
>         Environment: scala 2.10.4
> java 1.7.0_71
>            Reporter: weilin.chen
>              Labels: leak, memory
>
> Hi
>   I have a little program of spark 1.5, it receive data from a publisher in 
> spark streaming. It will process these received data with spark sql. But when 
> the time goes by I found the memory leak in driver, so i update to spark 
> 1.6.2. But, there is no change in the situation.
> here is the code:
> {quote}
>  val lines = ssc.receiverStream(new RReceiver("10.0.200.15", 6380, 
> "subresult"))
> val jsonf = 
> lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,
>  Any]])
> val logs = jsonf.map(data => LogStashV1(data("message").toString, 
> data("path").toString, data("host").toString, 
> data("lineno").toString.toDouble, data("timestamp").toString))
> logs.foreachRDD( rdd => { 
>  import sqc.implicits._
>  rdd.toDF.registerTempTable("logstash")
>  val sqlreport0 = sqc.sql("SELECT message, COUNT(message) AS host_c, 
> SUM(lineno) AS line_a FROM logstash WHERE path = '/var/log/system.log' AND 
> lineno > 70 GROUP BY message ORDER BY host_c DESC LIMIT 100")
>  sqlreport0.map(t => AlertMsg(t(0).toString, t(1).toString.toInt, 
> t(2).toString.toDouble)).collect().foreach(println)
> sqlreport0.map(t => AlertMsg(t(0).toString, t(1).toString.toInt, 
> t(2).toString.toDouble)).collect().foreach(println)
>  {quote}
> jmap information:
>  {quote}
>  num     #instances         #bytes  class name
> ----------------------------------------------
>    1:         34819       72711952  [B
>    2:       2297557       66010656  [C
>    3:       2296294       55111056  java.lang.String
>    4:       1063491       42539640  org.apache.spark.scheduler.AccumulableInfo
>    5:       1251001       40032032  
> scala.collection.immutable.HashMap$HashMap1
>    6:       1394364       33464736  java.lang.Long
>    7:       1102516       26460384  scala.collection.immutable.$colon$colon
>    8:       1058202       25396848  
> org.apache.spark.sql.execution.metric.LongSQLMetricValue
>    9:       1266499       20263984  scala.Some
>   10:        124052       15889104  <methodKlass>
>   11:        124052       15269568  <constMethodKlass>
>   12:         11350       12082432  <constantPoolKlass>
>   13:         11350       11692880  <instanceKlassKlass>
>   14:         96682       10828384  org.apache.spark.executor.TaskMetrics
>   15:        233481        9505896  [Lscala.collection.immutable.HashMap;
>   16:         96682        6961104  org.apache.spark.scheduler.TaskInfo
>   17:          9589        6433312  <constantPoolCacheKlass>
>   18:        233000        5592000  
> scala.collection.immutable.HashMap$HashTrieMap
>   19:         96200        5387200  
> org.apache.spark.executor.ShuffleReadMetrics
>   20:        113381        3628192  scala.collection.mutable.ListBuffer
>   21:          7252        2891792  <methodDataKlass>
>   22:        117073        2809752  scala.collection.mutable.DefaultEntry
>  {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to