Hi Olivier, *the update function is as below*:
*val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long, Long)]) => {* * val previousCount = state.getOrElse((0L, 0L))._2* * var startValue: IConcurrentUsers = ConcurrentViewers(0)* * var currentCount = 0L* * val lastIndexOfConcurrentUsers =* * values.lastIndexWhere(_.isInstanceOf[ConcurrentViewers])* * val subList = values.slice(0, lastIndexOfConcurrentUsers)* * val currentCountFromSubList = subList.foldLeft(startValue)(_ op _).count + previousCount* * val lastConcurrentViewersCount = values(lastIndexOfConcurrentUsers).count* * if (math.abs(lastConcurrentViewersCount - currentCountFromSubList) >= 1) {* * logger.error(* * s"Count using state updation $currentCountFromSubList, " +* * s"ConcurrentUsers count $lastConcurrentViewersCount" +* * s" resetting to $lastConcurrentViewersCount"* * )* * currentCount = lastConcurrentViewersCount* * }* * val remainingValuesList = values.diff(subList)* * startValue = ConcurrentViewers(currentCount)* * currentCount = remainingValuesList.foldLeft(startValue)(_ op _).count* * if (currentCount < 0) {* * logger.error(* * s"ERROR: Got new count $currentCount < 0, value:$values, state:$state, resetting to 0"* * )* * currentCount = 0* * }* * // to stop pushing subsequent 0 after receiving first 0* * if (currentCount == 0 && previousCount == 0) None* * else Some(previousCount, currentCount)* * }* *trait IConcurrentUsers {* * val count: Long* * def op(a: IConcurrentUsers): IConcurrentUsers = IConcurrentUsers.op(this, a)* *}* *object IConcurrentUsers {* * def op(a: IConcurrentUsers, b: IConcurrentUsers): IConcurrentUsers = (a, b) match {* * case (_, _: ConcurrentViewers) => * * ConcurrentViewers(b.count)* * case (_: ConcurrentViewers, _: IncrementConcurrentViewers) => * * ConcurrentViewers(a.count + b.count)* * case (_: ConcurrentViewers, _: DecrementConcurrentViewers) => * * ConcurrentViewers(a.count - b.count)* * }* *}* *case class IncrementConcurrentViewers(count: Long) extends IConcurrentUsers* *case class DecrementConcurrentViewers(count: Long) extends IConcurrentUsers* *case class ConcurrentViewers(count: Long) extends IConcurrentUsers* *also the error stack trace copied from executor logs is:* *java.lang.OutOfMemoryError: Java heap space* * at org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)* * at org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2564)* * at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)* * at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)* * at org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)* * at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)* * at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)* * at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)* * at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)* * at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* * at java.lang.reflect.Method.invoke(Method.java:601)* * at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)* * at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)* * at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)* * at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)* * at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)* * at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)* * at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:236)* * at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)* * at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)* * at org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:155)* * at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)* * at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* * at java.lang.reflect.Method.invoke(Method.java:601)* * at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)* * at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)* * at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)* * at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)* * at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)* * at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)* * at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)* * at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)* *15/04/21 15:51:23 ERROR ExecutorUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-1,5,main]* On Wed, Apr 22, 2015 at 1:32 AM, Olivier Girardot <ssab...@gmail.com> wrote: > Hi Sourav, > Can you post your updateFunc as well please ? > > Regards, > > Olivier. > > Le mar. 21 avr. 2015 à 12:48, Sourav Chandra < > sourav.chan...@livestream.com> a écrit : > >> Hi, >> >> We are building a spark streaming application which reads from kafka, >> does updateStateBykey based on the received message type and finally stores >> into redis. >> >> After running for few seconds the executor process get killed by throwing >> OutOfMemory error. >> >> The code snippet is below: >> >> >> *NoOfReceiverInstances = 1* >> >> *val kafkaStreams = (1 to NoOfReceiverInstances).map(* >> * _ => KafkaUtils.createStream(ssc, ZKQuorum, ConsumerGroup, TopicsMap)* >> *)* >> *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long, >> Long)]) => {...}* >> >> >> *ssc.union(kafkaStreams).map(KafkaMessageMapper(_)).filter(...)..updateStateByKey(updateFunc).foreachRDD(_.foreachPartition(RedisHelper.update(_)))* >> >> >> >> *object RedisHelper {* >> * private val client = scredis.Redis(* >> * >> ConfigFactory.parseProperties(System.getProperties).getConfig(namespace)* >> * )* >> >> * def update(**itr: Iterator[(String, (Long, Long))]) {* >> * // redis save operation* >> * }* >> >> *}* >> >> >> *Below is the spark configuration:* >> >> >> * spark.app.name <http://spark.app.name> = "XXXXXXX"* >> * spark.jars = "xxxx.jar"* >> * spark.home = "/spark-1.1.1-bin-hadoop2.4"* >> * spark.executor.memory = 1g* >> * spark.streaming.concurrentJobs = 1000* >> * spark.logConf = true* >> * spark.cleaner.ttl = 3600 //in milliseconds* >> * spark.default.parallelism = 12* >> * spark.executor.extraJavaOptions = "-Xloggc:gc.log >> -XX:+PrintGCDetails -XX:+UseConcMarkSweepGC -XX:HeapDumpPath=1.hprof >> -XX:+HeapDumpOnOutOfMemoryError"* >> * spark.executor.logs.rolling.strategy = "size"* >> * spark.executor.logs.rolling.size.maxBytes = 104857600 // 100 MB* >> * spark.executor.logs.rolling.maxRetainedFiles = 10* >> * spark.serializer = "org.apache.spark.serializer.KryoSerializer"* >> * spark.kryo.registrator = "xxx.NoOpKryoRegistrator"* >> >> >> other configurations are below >> >> *streaming {* >> * // All streaming context related configs should come here* >> * batch-duration = "1 second"* >> * checkpoint-directory = "/tmp"* >> * checkpoint-duration = "10 seconds"* >> * slide-duration = "1 second"* >> * window-duration = "1 second"* >> * partitions-for-shuffle-task = 32* >> * }* >> * kafka {* >> * no-of-receivers = 1* >> * zookeeper-quorum = "xxxx:2181"* >> * consumer-group = "xxxxx"* >> * topic = "xxxxx:2"* >> * }* >> >> We tried different combinations like >> - with spark 1.1.0 and 1.1.1. >> - by increasing executor memory >> - by changing the serialization strategy (switching between kryo and >> normal java) >> - by changing broadcast strategy (switching between http and torrent >> broadcast) >> >> >> Can anyone give any insight what we are missing here? How can we fix this? >> >> Due to akka version mismatch with some other libraries we cannot upgrade >> the spark version. >> >> Thanks, >> -- >> >> Sourav Chandra >> >> Senior Software Engineer >> >> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · >> >> sourav.chan...@livestream.com >> >> o: +91 80 4121 8723 >> >> m: +91 988 699 3746 >> >> skype: sourav.chandra >> >> Livestream >> >> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd >> Block, Koramangala Industrial Area, >> >> Bangalore 560034 >> >> www.livestream.com >> > -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com