Hi Sourav, Can you post your updateFunc as well please ? Regards,
Olivier. Le mar. 21 avr. 2015 à 12:48, Sourav Chandra <sourav.chan...@livestream.com> a écrit : > Hi, > > We are building a spark streaming application which reads from kafka, does > updateStateBykey based on the received message type and finally stores into > redis. > > After running for few seconds the executor process get killed by throwing > OutOfMemory error. > > The code snippet is below: > > > *NoOfReceiverInstances = 1* > > *val kafkaStreams = (1 to NoOfReceiverInstances).map(* > * _ => KafkaUtils.createStream(ssc, ZKQuorum, ConsumerGroup, TopicsMap)* > *)* > *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long, > Long)]) => {...}* > > > *ssc.union(kafkaStreams).map(KafkaMessageMapper(_)).filter(...)..updateStateByKey(updateFunc).foreachRDD(_.foreachPartition(RedisHelper.update(_)))* > > > > *object RedisHelper {* > * private val client = scredis.Redis(* > * > ConfigFactory.parseProperties(System.getProperties).getConfig(namespace)* > * )* > > * def update(**itr: Iterator[(String, (Long, Long))]) {* > * // redis save operation* > * }* > > *}* > > > *Below is the spark configuration:* > > > * spark.app.name <http://spark.app.name> = "XXXXXXX"* > * spark.jars = "xxxx.jar"* > * spark.home = "/spark-1.1.1-bin-hadoop2.4"* > * spark.executor.memory = 1g* > * spark.streaming.concurrentJobs = 1000* > * spark.logConf = true* > * spark.cleaner.ttl = 3600 //in milliseconds* > * spark.default.parallelism = 12* > * spark.executor.extraJavaOptions = "-Xloggc:gc.log -XX:+PrintGCDetails > -XX:+UseConcMarkSweepGC -XX:HeapDumpPath=1.hprof > -XX:+HeapDumpOnOutOfMemoryError"* > * spark.executor.logs.rolling.strategy = "size"* > * spark.executor.logs.rolling.size.maxBytes = 104857600 // 100 MB* > * spark.executor.logs.rolling.maxRetainedFiles = 10* > * spark.serializer = "org.apache.spark.serializer.KryoSerializer"* > * spark.kryo.registrator = "xxx.NoOpKryoRegistrator"* > > > other configurations are below > > *streaming {* > * // All streaming context related configs should come here* > * batch-duration = "1 second"* > * checkpoint-directory = "/tmp"* > * checkpoint-duration = "10 seconds"* > * slide-duration = "1 second"* > * window-duration = "1 second"* > * partitions-for-shuffle-task = 32* > * }* > * kafka {* > * no-of-receivers = 1* > * zookeeper-quorum = "xxxx:2181"* > * consumer-group = "xxxxx"* > * topic = "xxxxx:2"* > * }* > > We tried different combinations like > - with spark 1.1.0 and 1.1.1. > - by increasing executor memory > - by changing the serialization strategy (switching between kryo and > normal java) > - by changing broadcast strategy (switching between http and torrent > broadcast) > > > Can anyone give any insight what we are missing here? How can we fix this? > > Due to akka version mismatch with some other libraries we cannot upgrade > the spark version. > > Thanks, > -- > > Sourav Chandra > > Senior Software Engineer > > · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · > > sourav.chan...@livestream.com > > o: +91 80 4121 8723 > > m: +91 988 699 3746 > > skype: sourav.chandra > > Livestream > > "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd > Block, Koramangala Industrial Area, > > Bangalore 560034 > > www.livestream.com >