[ https://issues.apache.org/jira/browse/SPARK-13183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148205#comment-15148205 ]
dylanzhou edited comment on SPARK-13183 at 2/16/16 7:45 AM: ------------------------------------------------------------ There is a memory leak problem, and finally will run out of heap memory error java.lang.OutOfMemoryError:Java for heap space. When I try to increase driver memory, just streaming programs work a little longer, in my opinion byte[] objects cannot be reclaimed by the GC. Here is my question, get advice here is my point, thank you! http://apache-spark-user-list.1001560.n3.nabble.com/the-memory-leak-problem-of-use-sparkstreamimg-and-sparksql-with-kafka-in-spark-1-4-1-td26231.html was (Author: dylanzhou): There is a memory leak problem, and finally will run out of heap memory error java.lang.OutOfMemoryError:Java for heap space. When I try to increase driver memory, just streaming programs work a little longer, in my opinion byte[] objects cannot be reclaimed by the GC. Here is my program, get advice here is my point, thank you! object LogAnalyzerStreamingSQL { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("Log Analyzer Streaming in Scala") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val ssc = new StreamingContext(sc, 30) val topicSet = Set("applogs") val kafkaParams = Map[String, String]( "metadata.broker.list" -> "192.168.100.1:9092,192.168.100.2:9092,192.168.100.3:9092", "group.id" -> "app_group", "serializer.class" -> "kafka.serializer.StringEncoder") val kafkaStream= KafkaUtils.createDirectStream(ssc,kafkaParams,topics) kafkaStream.foreachRDD(rdd => { if (!rdd.isEmpty()) { val jsonRdd=rdd.map(x=>x._2) val df = sqlContext.read.json(jsonRdd) df.registerTempTable("applogs") sqlContext.cacheTable("applogs") // Calculate statistics based on the content size. val contentSizeStats = sqlContext .sql("SELECT SUM(contentSize), COUNT(*), MIN(contentSize), MAX(contentSize) FROM applogs") .show() // Compute Response Code to Count. val responseCodeToCount = sqlContext .sql("SELECT responseCode, COUNT(*) FROM applogs GROUP BY responseCode") .map(row => (row.getInt(0), row.getLong(1))) .show() // Any IPAddress that has accessed the server more than 10 times. val ipAddresses =sqlContext .sql("SELECT ipAddress, COUNT(*) AS total FROM applogs GROUP BY ipAddress HAVING total > 10") .map(row => row.getString(0)) .take(100) val topEndpoints = sqlContext .sql("SELECT endpoint, COUNT(*) AS total FROM applogs GROUP BY endpoint ORDER BY total DESC LIMIT 10") .map(row => (row.getString(0), row.getLong(1))) .show() //....a lot of sql like that sqlContext.uncacheTable("applogs") } }) ssc.start() ssc.awaitTermination() } } > Bytebuffers occupy a large amount of heap memory > ------------------------------------------------ > > Key: SPARK-13183 > URL: https://issues.apache.org/jira/browse/SPARK-13183 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.4.1 > Reporter: dylanzhou > > When I used sparkstreamimg and sparksql, i cache the table,found that old gen > increases very fast and full GC is very frequent, running for a period of > time will be out of memory, after analysis of heap memory, found that there > are a large number of org.apache.spark.sql.columnar.ColumnBuilder[38] @ > 0xd022a0b8, takes up 90% of the space, look at the source is HeapByteBuffer > occupy, don't know why these objects are not released, had been waiting for > GC to recycle;if i donot use cache table, there will be no this problem, but > I need to repeatedly query this table do -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org