Hi Sourav,
Can you post your updateFunc as well please ?

Regards,

Olivier.

Le mar. 21 avr. 2015 à 12:48, Sourav Chandra <sourav.chan...@livestream.com>
a écrit :

> Hi,
>
> We are building a spark streaming application which reads from kafka, does
> updateStateBykey based on the received message type and finally stores into
> redis.
>
> After running for few seconds the executor process get killed by throwing
> OutOfMemory error.
>
> The code snippet is below:
>
>
> *NoOfReceiverInstances = 1*
>
> *val kafkaStreams = (1 to NoOfReceiverInstances).map(*
> *  _ => KafkaUtils.createStream(ssc, ZKQuorum, ConsumerGroup, TopicsMap)*
> *)*
> *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long,
> Long)]) => {...}*
>
>
> *ssc.union(kafkaStreams).map(KafkaMessageMapper(_)).filter(...)..updateStateByKey(updateFunc).foreachRDD(_.foreachPartition(RedisHelper.update(_)))*
>
>
>
> *object RedisHelper {*
> *  private val client = scredis.Redis(*
> *
> ConfigFactory.parseProperties(System.getProperties).getConfig(namespace)*
> *  )*
>
> *  def update(**itr: Iterator[(String, (Long, Long))]) {*
> *    // redis save operation*
> *  }*
>
> *}*
>
>
> *Below is the spark configuration:*
>
>
> *    spark.app.name <http://spark.app.name> = "XXXXXXX"*
> *    spark.jars = "xxxx.jar"*
> *    spark.home = "/spark-1.1.1-bin-hadoop2.4"*
> *    spark.executor.memory = 1g*
> *    spark.streaming.concurrentJobs = 1000*
> *    spark.logConf = true*
> *    spark.cleaner.ttl = 3600 //in milliseconds*
> *    spark.default.parallelism = 12*
> *    spark.executor.extraJavaOptions = "-Xloggc:gc.log -XX:+PrintGCDetails
> -XX:+UseConcMarkSweepGC -XX:HeapDumpPath=1.hprof
> -XX:+HeapDumpOnOutOfMemoryError"*
> *    spark.executor.logs.rolling.strategy = "size"*
> *    spark.executor.logs.rolling.size.maxBytes = 104857600 // 100 MB*
> *    spark.executor.logs.rolling.maxRetainedFiles = 10*
> *    spark.serializer = "org.apache.spark.serializer.KryoSerializer"*
> *    spark.kryo.registrator = "xxx.NoOpKryoRegistrator"*
>
>
> other configurations are below
>
> *streaming {*
> *    // All streaming context related configs should come here*
> *    batch-duration = "1 second"*
> *    checkpoint-directory = "/tmp"*
> *    checkpoint-duration = "10 seconds"*
> *    slide-duration = "1 second"*
> *    window-duration = "1 second"*
> *    partitions-for-shuffle-task = 32*
> *  }*
> *  kafka {*
> *    no-of-receivers = 1*
> *    zookeeper-quorum = "xxxx:2181"*
> *    consumer-group = "xxxxx"*
> *    topic = "xxxxx:2"*
> *  }*
>
> We tried different combinations like
>  - with spark 1.1.0 and 1.1.1.
>  - by increasing executor memory
>  - by changing the serialization strategy (switching between kryo and
> normal java)
>  - by changing broadcast strategy (switching between http and torrent
> broadcast)
>
>
> Can anyone give any insight what we are missing here? How can we fix this?
>
> Due to akka version mismatch with some other libraries we cannot upgrade
> the spark version.
>
> Thanks,
> --
>
> Sourav Chandra
>
> Senior Software Engineer
>
> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
>
> sourav.chan...@livestream.com
>
> o: +91 80 4121 8723
>
> m: +91 988 699 3746
>
> skype: sourav.chandra
>
> Livestream
>
> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
> Block, Koramangala Industrial Area,
>
> Bangalore 560034
>
> www.livestream.com
>

Reply via email to