Re: spark streaming exectors memory increasing and executor killed by yarn

2017-03-20 Thread darin
This issue on stackoverflow maybe help

https://stackoverflow.com/questions/42641573/why-does-memory-usage-of-spark-worker-increases-with-time/42642233#42642233



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-exectors-memory-increasing-and-executor-killed-by-yarn-tp28500p28512.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark streaming exectors memory increasing and executor killed by yarn

2017-03-18 Thread Bill Schwanitz
I have had similar issues with some of my spark jobs especially doing 
things like repartitioning.


spark.yarn.driver.memoryOverhead	driverMemory * 0.10, with minimum of 
384	The amount of off-heap memory (in megabytes) to be allocated per 
driver in cluster mode. This is memory that accounts for things like VM 
overheads, interned strings, other native overheads, etc. This tends to 
grow with the container size (typically 6-10%).


I bumped the overhead memory as a way to work around the issue. Not sure 
if that is the best way but its how I got around it ;)


darin wrote:

Hi,
I got this exception when streaming program run some hours.

```
*User class threw exception: org.apache.spark.SparkException: Job aborted
due to stage failure: Task 21 in stage 1194.0 failed 4 times, most recent
failure: Lost task 21.3 in stage 1194.0 (TID 2475, 2.dev3, executor 66):
ExecutorLostFailure (executor 66 exited caused by one of the running tasks)
Reason: Container killed by YARN for exceeding memory limits. 3.5 GB of 3.5
GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead.*
```

I have googled some solutions like close yarn memory monitor ,increasing
exector memory... .I think it is not the right way .


And this is the submit script:
```
*spark-submit --master yarn-cluster --driver-cores 1 --driver-memory 1G
--num-executors 6 --executor-cores 3 --executor-memory 3G --conf
"spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC -XX:+UseParNewGC
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/javadump.hprof" --conf
"spark.kryoserializer.buffer.max=512m" --class
com.dtise.data.streaming.ad.DTStreamingStatistics
hdfs://nameservice1/user/yanghb/spark-streaming-1.0.jar*
```

And This is the main codes:

```
val originalStream = ssc.textFileStream(rawDataPath)
 originalStream.repartition(10).mapPartitions(parseAdLog).reduceByKey(_
++ _)
   .mapWithState(StateSpec.function(countAdLogWithState
_)).foreachRDD(rdd =>  {
 if (!rdd.isEmpty()) {
   val batchTime = Calendar.getInstance.getTimeInMillis
   val dimensionSumMap = rdd.map(_._1).reduce(_ ++ _)
   val nameList = rdd.map(_._2).reduce(_ ++ _).toList
   val jedis = RedisUtils.jedis()
   jedis.hmset(joinString("t_ad_dimension_sum", batchTime),
dimensionSumMap)
   jedis.lpush(joinString("t_ad_name", batchTime), nameList: _*)
   jedis.set(joinString("t_ad", batchTime.toString), "OK")
   jedis.close()

   rdd.flatMap(_._3).foreachPartition(logInfoList =>  {
 val producter = new StringProducter
 for (logInfo<- logInfoList) {
   val logInfoArr = logInfo.split("\t", -1)
   val kafkaKey = "ad/" + logInfoArr(campaignIdIdx) + "/" +
logInfoArr(logDateIdx)
   producter.send("cookedLog", kafkaKey, logInfo)
 }
 producter.close()
   })
 }
   })
```

These are jvm heap mat results

<http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095238%402x.png>
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095254%402x.png>
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095331%402x.png>

/*Anybody has any advice about this ?
Thanks*/





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-exectors-memory-increasing-and-executor-killed-by-yarn-tp28500.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark streaming exectors memory increasing and executor killed by yarn

2017-03-17 Thread darin
I add this code in foreachRDD block .
```
rdd.persist(StorageLevel.MEMORY_AND_DISK)
```


This exception no occur agein.But many executor dead showing in spark
streaming UI .
```
User class threw exception: org.apache.spark.SparkException: Job aborted due
to stage failure: Task 21 in stage 1194.0 failed 4 times, most recent
failure: Lost task 21.3 in stage 1194.0 (TID 2475, 2.dev3, executor 66):
ExecutorLostFailure (executor 66 exited caused by one of the running tasks)
Reason: Container killed by YARN for exceeding memory limits. 3.5 GB of 3.5
GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead.
```




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-exectors-memory-increasing-and-executor-killed-by-yarn-tp28500p28506.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark streaming exectors memory increasing and executor killed by yarn

2017-03-16 Thread Yong Zhang
In this kind of question, you always want to tell us the spark version.


Yong



From: darin <lidal...@foxmail.com>
Sent: Thursday, March 16, 2017 9:59 PM
To: user@spark.apache.org
Subject: spark streaming exectors memory increasing and executor killed by yarn

Hi,
I got this exception when streaming program run some hours.

```
*User class threw exception: org.apache.spark.SparkException: Job aborted
due to stage failure: Task 21 in stage 1194.0 failed 4 times, most recent
failure: Lost task 21.3 in stage 1194.0 (TID 2475, 2.dev3, executor 66):
ExecutorLostFailure (executor 66 exited caused by one of the running tasks)
Reason: Container killed by YARN for exceeding memory limits. 3.5 GB of 3.5
GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead.*
```

I have googled some solutions like close yarn memory monitor ,increasing
exector memory... .I think it is not the right way .


And this is the submit script:
```
*spark-submit --master yarn-cluster --driver-cores 1 --driver-memory 1G
--num-executors 6 --executor-cores 3 --executor-memory 3G --conf
"spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC -XX:+UseParNewGC
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/javadump.hprof" --conf
"spark.kryoserializer.buffer.max=512m" --class
com.dtise.data.streaming.ad.DTStreamingStatistics
hdfs://nameservice1/user/yanghb/spark-streaming-1.0.jar*
```

And This is the main codes:

```
val originalStream = ssc.textFileStream(rawDataPath)
originalStream.repartition(10).mapPartitions(parseAdLog).reduceByKey(_
++ _)
  .mapWithState(StateSpec.function(countAdLogWithState
_)).foreachRDD(rdd => {
if (!rdd.isEmpty()) {
  val batchTime = Calendar.getInstance.getTimeInMillis
  val dimensionSumMap = rdd.map(_._1).reduce(_ ++ _)
  val nameList = rdd.map(_._2).reduce(_ ++ _).toList
  val jedis = RedisUtils.jedis()
  jedis.hmset(joinString("t_ad_dimension_sum", batchTime),
dimensionSumMap)
  jedis.lpush(joinString("t_ad_name", batchTime), nameList: _*)
  jedis.set(joinString("t_ad", batchTime.toString), "OK")
  jedis.close()

  rdd.flatMap(_._3).foreachPartition(logInfoList => {
val producter = new StringProducter
for (logInfo <- logInfoList) {
  val logInfoArr = logInfo.split("\t", -1)
  val kafkaKey = "ad/" + logInfoArr(campaignIdIdx) + "/" +
logInfoArr(logDateIdx)
  producter.send("cookedLog", kafkaKey, logInfo)
}
producter.close()
  })
}
  })
```

These are jvm heap mat results

<http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095238%402x.png>
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095254%402x.png>
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095331%402x.png>

/*Anybody has any advice about this ?
Thanks*/





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-exectors-memory-increasing-and-executor-killed-by-yarn-tp28500.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


[http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095238%402x.png]

[http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095254%402x.png]

[http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095331%402x.png]


spark streaming exectors memory increasing and executor killed by yarn

2017-03-16 Thread darin
Hi,
I got this exception when streaming program run some hours.

```
*User class threw exception: org.apache.spark.SparkException: Job aborted
due to stage failure: Task 21 in stage 1194.0 failed 4 times, most recent
failure: Lost task 21.3 in stage 1194.0 (TID 2475, 2.dev3, executor 66):
ExecutorLostFailure (executor 66 exited caused by one of the running tasks)
Reason: Container killed by YARN for exceeding memory limits. 3.5 GB of 3.5
GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead.*
```

I have googled some solutions like close yarn memory monitor ,increasing
exector memory... .I think it is not the right way .


And this is the submit script:
```
*spark-submit --master yarn-cluster --driver-cores 1 --driver-memory 1G
--num-executors 6 --executor-cores 3 --executor-memory 3G --conf
"spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC -XX:+UseParNewGC
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/javadump.hprof" --conf
"spark.kryoserializer.buffer.max=512m" --class
com.dtise.data.streaming.ad.DTStreamingStatistics
hdfs://nameservice1/user/yanghb/spark-streaming-1.0.jar*
```

And This is the main codes:

```
val originalStream = ssc.textFileStream(rawDataPath)
originalStream.repartition(10).mapPartitions(parseAdLog).reduceByKey(_
++ _)
  .mapWithState(StateSpec.function(countAdLogWithState
_)).foreachRDD(rdd => {
if (!rdd.isEmpty()) {
  val batchTime = Calendar.getInstance.getTimeInMillis
  val dimensionSumMap = rdd.map(_._1).reduce(_ ++ _)
  val nameList = rdd.map(_._2).reduce(_ ++ _).toList
  val jedis = RedisUtils.jedis()
  jedis.hmset(joinString("t_ad_dimension_sum", batchTime),
dimensionSumMap)
  jedis.lpush(joinString("t_ad_name", batchTime), nameList: _*)
  jedis.set(joinString("t_ad", batchTime.toString), "OK")
  jedis.close()

  rdd.flatMap(_._3).foreachPartition(logInfoList => {
val producter = new StringProducter
for (logInfo <- logInfoList) {
  val logInfoArr = logInfo.split("\t", -1)
  val kafkaKey = "ad/" + logInfoArr(campaignIdIdx) + "/" +
logInfoArr(logDateIdx)
  producter.send("cookedLog", kafkaKey, logInfo)
}
producter.close()
  })
}
  })
```

These are jvm heap mat results 

<http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095238%402x.png>
 
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095254%402x.png>
 
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095331%402x.png>
 

/*Anybody has any advice about this ?
Thanks*/





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-exectors-memory-increasing-and-executor-killed-by-yarn-tp28500.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org