It could very well be that your executor memory is not enough to store the
state RDDs AND operate on the data. 1G per executor is quite low.
Definitely give more memory. And have you tried increasing the number of
partitions (specify number of partitions in updateStateByKey) ?

On Wed, Apr 22, 2015 at 2:34 AM, Sourav Chandra <
sourav.chan...@livestream.com> wrote:

> Anyone?
>
> On Wed, Apr 22, 2015 at 12:29 PM, Sourav Chandra <
> sourav.chan...@livestream.com> wrote:
>
> > Hi Olivier,
> >
> > *the update function is as below*:
> >
> > *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long,
> > Long)]) => {*
> > *      val previousCount = state.getOrElse((0L, 0L))._2*
> > *      var startValue: IConcurrentUsers = ConcurrentViewers(0)*
> > *      var currentCount = 0L*
> > *      val lastIndexOfConcurrentUsers =*
> > *        values.lastIndexWhere(_.isInstanceOf[ConcurrentViewers])*
> > *      val subList = values.slice(0, lastIndexOfConcurrentUsers)*
> > *      val currentCountFromSubList = subList.foldLeft(startValue)(_ op
> > _).count + previousCount*
> > *      val lastConcurrentViewersCount =
> > values(lastIndexOfConcurrentUsers).count*
> >
> > *      if (math.abs(lastConcurrentViewersCount - currentCountFromSubList)
> > >= 1) {*
> > *        logger.error(*
> > *          s"Count using state updation $currentCountFromSubList, " +*
> > *            s"ConcurrentUsers count $lastConcurrentViewersCount" +*
> > *            s" resetting to $lastConcurrentViewersCount"*
> > *        )*
> > *        currentCount = lastConcurrentViewersCount*
> > *      }*
> > *      val remainingValuesList = values.diff(subList)*
> > *      startValue = ConcurrentViewers(currentCount)*
> > *      currentCount = remainingValuesList.foldLeft(startValue)(_ op
> > _).count*
> >
> > *      if (currentCount < 0) {*
> >
> > *        logger.error(*
> > *          s"ERROR: Got new count $currentCount < 0, value:$values,
> > state:$state, resetting to 0"*
> > *        )*
> > *        currentCount = 0*
> > *      }*
> > *      // to stop pushing subsequent 0 after receiving first 0*
> > *      if (currentCount == 0 && previousCount == 0) None*
> > *      else Some(previousCount, currentCount)*
> > *    }*
> >
> > *trait IConcurrentUsers {*
> > *  val count: Long*
> > *  def op(a: IConcurrentUsers): IConcurrentUsers =
> > IConcurrentUsers.op(this, a)*
> > *}*
> >
> > *object IConcurrentUsers {*
> > *  def op(a: IConcurrentUsers, b: IConcurrentUsers): IConcurrentUsers =
> > (a, b) match {*
> > *    case (_, _: ConcurrentViewers) => *
> > *      ConcurrentViewers(b.count)*
> > *    case (_: ConcurrentViewers, _: IncrementConcurrentViewers) => *
> > *      ConcurrentViewers(a.count + b.count)*
> > *    case (_: ConcurrentViewers, _: DecrementConcurrentViewers) => *
> > *      ConcurrentViewers(a.count - b.count)*
> > *  }*
> > *}*
> >
> > *case class IncrementConcurrentViewers(count: Long) extends
> > IConcurrentUsers*
> > *case class DecrementConcurrentViewers(count: Long) extends
> > IConcurrentUsers*
> > *case class ConcurrentViewers(count: Long) extends IConcurrentUsers*
> >
> >
> > *also the error stack trace copied from executor logs is:*
> >
> > *java.lang.OutOfMemoryError: Java heap space*
> > *        at
> >
> org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)*
> > *        at
> > org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2564)*
> > *        at
> > org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)*
> > *        at
> > org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)*
> > *        at
> >
> org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)*
> > *        at
> org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)*
> > *        at
> >
> org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)*
> > *        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*
> > *        at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)*
> > *        at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
> > *        at java.lang.reflect.Method.invoke(Method.java:601)*
> > *        at
> > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)*
> > *        at
> > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)*
> > *        at
> >
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)*
> > *        at
> > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)*
> > *        at
> > java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)*
> > *        at
> >
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)*
> > *        at
> >
> org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:236)*
> > *        at
> >
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)*
> > *        at
> org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)*
> > *        at
> >
> org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:155)*
> > *        at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)*
> > *        at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
> > *        at java.lang.reflect.Method.invoke(Method.java:601)*
> > *        at
> > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)*
> > *        at
> > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)*
> > *        at
> >
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)*
> > *        at
> > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)*
> > *        at
> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)*
> > *        at
> > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)*
> > *        at
> >
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)*
> > *        at
> > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)*
> > *15/04/21 15:51:23 ERROR ExecutorUncaughtExceptionHandler: Uncaught
> > exception in thread Thread[Executor task launch worker-1,5,main]*
> >
> >
> >
> > On Wed, Apr 22, 2015 at 1:32 AM, Olivier Girardot <ssab...@gmail.com>
> > wrote:
> >
> >> Hi Sourav,
> >> Can you post your updateFunc as well please ?
> >>
> >> Regards,
> >>
> >> Olivier.
> >>
> >> Le mar. 21 avr. 2015 à 12:48, Sourav Chandra <
> >> sourav.chan...@livestream.com> a écrit :
> >>
> >>> Hi,
> >>>
> >>> We are building a spark streaming application which reads from kafka,
> >>> does updateStateBykey based on the received message type and finally
> stores
> >>> into redis.
> >>>
> >>> After running for few seconds the executor process get killed by
> >>> throwing OutOfMemory error.
> >>>
> >>> The code snippet is below:
> >>>
> >>>
> >>> *NoOfReceiverInstances = 1*
> >>>
> >>> *val kafkaStreams = (1 to NoOfReceiverInstances).map(*
> >>> *  _ => KafkaUtils.createStream(ssc, ZKQuorum, ConsumerGroup,
> TopicsMap)*
> >>> *)*
> >>> *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long,
> >>> Long)]) => {...}*
> >>>
> >>>
> >>>
> *ssc.union(kafkaStreams).map(KafkaMessageMapper(_)).filter(...)..updateStateByKey(updateFunc).foreachRDD(_.foreachPartition(RedisHelper.update(_)))*
> >>>
> >>>
> >>>
> >>> *object RedisHelper {*
> >>> *  private val client = scredis.Redis(*
> >>> *
> >>>
> ConfigFactory.parseProperties(System.getProperties).getConfig(namespace)*
> >>> *  )*
> >>>
> >>> *  def update(**itr: Iterator[(String, (Long, Long))]) {*
> >>> *    // redis save operation*
> >>> *  }*
> >>>
> >>> *}*
> >>>
> >>>
> >>> *Below is the spark configuration:*
> >>>
> >>>
> >>> *    spark.app.name <http://spark.app.name> = "XXXXXXX"*
> >>> *    spark.jars = "xxxx.jar"*
> >>> *    spark.home = "/spark-1.1.1-bin-hadoop2.4"*
> >>> *    spark.executor.memory = 1g*
> >>> *    spark.streaming.concurrentJobs = 1000*
> >>> *    spark.logConf = true*
> >>> *    spark.cleaner.ttl = 3600 //in milliseconds*
> >>> *    spark.default.parallelism = 12*
> >>> *    spark.executor.extraJavaOptions = "-Xloggc:gc.log
> >>> -XX:+PrintGCDetails -XX:+UseConcMarkSweepGC -XX:HeapDumpPath=1.hprof
> >>> -XX:+HeapDumpOnOutOfMemoryError"*
> >>> *    spark.executor.logs.rolling.strategy = "size"*
> >>> *    spark.executor.logs.rolling.size.maxBytes = 104857600 // 100 MB*
> >>> *    spark.executor.logs.rolling.maxRetainedFiles = 10*
> >>> *    spark.serializer = "org.apache.spark.serializer.KryoSerializer"*
> >>> *    spark.kryo.registrator = "xxx.NoOpKryoRegistrator"*
> >>>
> >>>
> >>> other configurations are below
> >>>
> >>> *streaming {*
> >>> *    // All streaming context related configs should come here*
> >>> *    batch-duration = "1 second"*
> >>> *    checkpoint-directory = "/tmp"*
> >>> *    checkpoint-duration = "10 seconds"*
> >>> *    slide-duration = "1 second"*
> >>> *    window-duration = "1 second"*
> >>> *    partitions-for-shuffle-task = 32*
> >>> *  }*
> >>> *  kafka {*
> >>> *    no-of-receivers = 1*
> >>> *    zookeeper-quorum = "xxxx:2181"*
> >>> *    consumer-group = "xxxxx"*
> >>> *    topic = "xxxxx:2"*
> >>> *  }*
> >>>
> >>> We tried different combinations like
> >>>  - with spark 1.1.0 and 1.1.1.
> >>>  - by increasing executor memory
> >>>  - by changing the serialization strategy (switching between kryo and
> >>> normal java)
> >>>  - by changing broadcast strategy (switching between http and torrent
> >>> broadcast)
> >>>
> >>>
> >>> Can anyone give any insight what we are missing here? How can we fix
> >>> this?
> >>>
> >>> Due to akka version mismatch with some other libraries we cannot
> upgrade
> >>> the spark version.
> >>>
> >>> Thanks,
> >>> --
> >>>
> >>> Sourav Chandra
> >>>
> >>> Senior Software Engineer
> >>>
> >>> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
> >>>
> >>> sourav.chan...@livestream.com
> >>>
> >>> o: +91 80 4121 8723
> >>>
> >>> m: +91 988 699 3746
> >>>
> >>> skype: sourav.chandra
> >>>
> >>> Livestream
> >>>
> >>> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
> >>> Block, Koramangala Industrial Area,
> >>>
> >>> Bangalore 560034
> >>>
> >>> www.livestream.com
> >>>
> >>
> >
> >
> > --
> >
> > Sourav Chandra
> >
> > Senior Software Engineer
> >
> > · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
> >
> > sourav.chan...@livestream.com
> >
> > o: +91 80 4121 8723
> >
> > m: +91 988 699 3746
> >
> > skype: sourav.chandra
> >
> > Livestream
> >
> > "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
> > Block, Koramangala Industrial Area,
> >
> > Bangalore 560034
> >
> > www.livestream.com
> >
>
>
>
> --
>
> Sourav Chandra
>
> Senior Software Engineer
>
> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
>
> sourav.chan...@livestream.com
>
> o: +91 80 4121 8723
>
> m: +91 988 699 3746
>
> skype: sourav.chandra
>
> Livestream
>
> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
> Block, Koramangala Industrial Area,
>
> Bangalore 560034
>
> www.livestream.com
>

Reply via email to