Hi Olivier,
*the update function is as below*:
*val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long,
Long)]) => {*
* val previousCount = state.getOrElse((0L, 0L))._2*
* var startValue: IConcurrentUsers = ConcurrentViewers(0)*
* var currentCount = 0L*
* val lastIndexOfConcurrentUsers =*
* values.lastIndexWhere(_.isInstanceOf[ConcurrentViewers])*
* val subList = values.slice(0, lastIndexOfConcurrentUsers)*
* val currentCountFromSubList = subList.foldLeft(startValue)(_ op
_).count + previousCount*
* val lastConcurrentViewersCount =
values(lastIndexOfConcurrentUsers).count*
* if (math.abs(lastConcurrentViewersCount - currentCountFromSubList)
>= 1) {*
* logger.error(*
* s"Count using state updation $currentCountFromSubList, " +*
* s"ConcurrentUsers count $lastConcurrentViewersCount" +*
* s" resetting to $lastConcurrentViewersCount"*
* )*
* currentCount = lastConcurrentViewersCount*
* }*
* val remainingValuesList = values.diff(subList)*
* startValue = ConcurrentViewers(currentCount)*
* currentCount = remainingValuesList.foldLeft(startValue)(_ op
_).count*
* if (currentCount < 0) {*
* logger.error(*
* s"ERROR: Got new count $currentCount < 0, value:$values,
state:$state, resetting to 0"*
* )*
* currentCount = 0*
* }*
* // to stop pushing subsequent 0 after receiving first 0*
* if (currentCount == 0 && previousCount == 0) None*
* else Some(previousCount, currentCount)*
* }*
*trait IConcurrentUsers {*
* val count: Long*
* def op(a: IConcurrentUsers): IConcurrentUsers =
IConcurrentUsers.op(this, a)*
*}*
*object IConcurrentUsers {*
* def op(a: IConcurrentUsers, b: IConcurrentUsers): IConcurrentUsers = (a,
b) match {*
* case (_, _: ConcurrentViewers) => *
* ConcurrentViewers(b.count)*
* case (_: ConcurrentViewers, _: IncrementConcurrentViewers) => *
* ConcurrentViewers(a.count + b.count)*
* case (_: ConcurrentViewers, _: DecrementConcurrentViewers) => *
* ConcurrentViewers(a.count - b.count)*
* }*
*}*
*case class IncrementConcurrentViewers(count: Long) extends
IConcurrentUsers*
*case class DecrementConcurrentViewers(count: Long) extends
IConcurrentUsers*
*case class ConcurrentViewers(count: Long) extends IConcurrentUsers*
*also the error stack trace copied from executor logs is:*
*java.lang.OutOfMemoryError: Java heap space*
* at
org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)*
* at
org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2564)*
* at
org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)*
* at
org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)*
* at
org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)*
* at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)*
* at
org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)*
* at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*
* at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)*
* at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
* at java.lang.reflect.Method.invoke(Method.java:601)*
* at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)*
* at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)*
* at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)*
* at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)*
* at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)*
* at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)*
* at
org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:236)*
* at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)*
* at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)*
* at
org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:155)*
* at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)*
* at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
* at java.lang.reflect.Method.invoke(Method.java:601)*
* at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)*
* at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)*
* at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)*
* at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)*
* at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)*
* at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)*
* at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)*
* at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)*
*15/04/21 15:51:23 ERROR ExecutorUncaughtExceptionHandler: Uncaught
exception in thread Thread[Executor task launch worker-1,5,main]*
On Wed, Apr 22, 2015 at 1:32 AM, Olivier Girardot <[email protected]> wrote:
> Hi Sourav,
> Can you post your updateFunc as well please ?
>
> Regards,
>
> Olivier.
>
> Le mar. 21 avr. 2015 à 12:48, Sourav Chandra <
> [email protected]> a écrit :
>
>> Hi,
>>
>> We are building a spark streaming application which reads from kafka,
>> does updateStateBykey based on the received message type and finally stores
>> into redis.
>>
>> After running for few seconds the executor process get killed by throwing
>> OutOfMemory error.
>>
>> The code snippet is below:
>>
>>
>> *NoOfReceiverInstances = 1*
>>
>> *val kafkaStreams = (1 to NoOfReceiverInstances).map(*
>> * _ => KafkaUtils.createStream(ssc, ZKQuorum, ConsumerGroup, TopicsMap)*
>> *)*
>> *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long,
>> Long)]) => {...}*
>>
>>
>> *ssc.union(kafkaStreams).map(KafkaMessageMapper(_)).filter(...)..updateStateByKey(updateFunc).foreachRDD(_.foreachPartition(RedisHelper.update(_)))*
>>
>>
>>
>> *object RedisHelper {*
>> * private val client = scredis.Redis(*
>> *
>> ConfigFactory.parseProperties(System.getProperties).getConfig(namespace)*
>> * )*
>>
>> * def update(**itr: Iterator[(String, (Long, Long))]) {*
>> * // redis save operation*
>> * }*
>>
>> *}*
>>
>>
>> *Below is the spark configuration:*
>>
>>
>> * spark.app.name <http://spark.app.name> = "XXXXXXX"*
>> * spark.jars = "xxxx.jar"*
>> * spark.home = "/spark-1.1.1-bin-hadoop2.4"*
>> * spark.executor.memory = 1g*
>> * spark.streaming.concurrentJobs = 1000*
>> * spark.logConf = true*
>> * spark.cleaner.ttl = 3600 //in milliseconds*
>> * spark.default.parallelism = 12*
>> * spark.executor.extraJavaOptions = "-Xloggc:gc.log
>> -XX:+PrintGCDetails -XX:+UseConcMarkSweepGC -XX:HeapDumpPath=1.hprof
>> -XX:+HeapDumpOnOutOfMemoryError"*
>> * spark.executor.logs.rolling.strategy = "size"*
>> * spark.executor.logs.rolling.size.maxBytes = 104857600 // 100 MB*
>> * spark.executor.logs.rolling.maxRetainedFiles = 10*
>> * spark.serializer = "org.apache.spark.serializer.KryoSerializer"*
>> * spark.kryo.registrator = "xxx.NoOpKryoRegistrator"*
>>
>>
>> other configurations are below
>>
>> *streaming {*
>> * // All streaming context related configs should come here*
>> * batch-duration = "1 second"*
>> * checkpoint-directory = "/tmp"*
>> * checkpoint-duration = "10 seconds"*
>> * slide-duration = "1 second"*
>> * window-duration = "1 second"*
>> * partitions-for-shuffle-task = 32*
>> * }*
>> * kafka {*
>> * no-of-receivers = 1*
>> * zookeeper-quorum = "xxxx:2181"*
>> * consumer-group = "xxxxx"*
>> * topic = "xxxxx:2"*
>> * }*
>>
>> We tried different combinations like
>> - with spark 1.1.0 and 1.1.1.
>> - by increasing executor memory
>> - by changing the serialization strategy (switching between kryo and
>> normal java)
>> - by changing broadcast strategy (switching between http and torrent
>> broadcast)
>>
>>
>> Can anyone give any insight what we are missing here? How can we fix this?
>>
>> Due to akka version mismatch with some other libraries we cannot upgrade
>> the spark version.
>>
>> Thanks,
>> --
>>
>> Sourav Chandra
>>
>> Senior Software Engineer
>>
>> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
>>
>> [email protected]
>>
>> o: +91 80 4121 8723
>>
>> m: +91 988 699 3746
>>
>> skype: sourav.chandra
>>
>> Livestream
>>
>> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
>> Block, Koramangala Industrial Area,
>>
>> Bangalore 560034
>>
>> www.livestream.com
>>
>
--
Sourav Chandra
Senior Software Engineer
· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
[email protected]
o: +91 80 4121 8723
m: +91 988 699 3746
skype: sourav.chandra
Livestream
"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,
Bangalore 560034
www.livestream.com