HI TD,
Some observations:
1. If I submit the application using spark-submit tool with *client as
deploy mode* it works fine with single master and worker (driver, master
and worker are running in same machine)
2. If I submit the application using spark-submit tool with client as
deploy mode it *crashes after some time with StackOverflowError* *single
master and 2 workers* (driver, master and 1 worker is running in same
machine, other
worker is in different machine)
*15/04/23 05:42:04 Executor: Exception in task 0.0 in stage 23153.0
(TID 5412)*
*java.lang.StackOverflowError*
* at
java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2864)*
* at java.io.ObjectInputStream.readUTF(ObjectInputStream.java:1072)*
* at
java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:671)*
* at
java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830)*
* at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)*
* at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)*
* at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)*
* at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
* at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
* at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
* at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
* at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
* at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
* at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
* at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
* at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
* at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)*
* at
scala.collection.immutable.$colon$colon.readObject(List.scala:362)*
* at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)*
* at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
* at java.lang.reflect.Method.invoke(Method.java:606)*
* at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)*
* at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)*
* at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
* at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
* at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
* at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
* at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
* at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
* at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
* at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
* at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
* at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
* at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)*
* at
scala.collection.immutable.$colon$colon.readObject(List.scala:362)*
* at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)*
* at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
* at java.lang.reflect.Method.invoke(Method.java:606)*
* at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)*
* at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)*
* at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
* at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
* at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
* at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
* at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
* at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
* at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
* at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
* at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
* at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
* at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)*
* at
scala.collection.immutable.$colon$colon.readObject(List.scala:366)*
* at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)*
3. If I submit the application using spark-submit tool with *cluster as
deploy mode* it *crashes after some time with Out Of Memory error *with
single master and worker (driver, master and worker are running in same
machine)
* 15/04/23 05:47:49 Executor: Exception in task 0.0 in stage 858.0 (TID
1464)*
*java.lang.OutOfMemoryError: Java heap space*
* at
org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)*
* at
org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2564)*
* at
org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)*
* at
org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)*
* at
org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)*
* at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)*
* at
org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)*
* at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*
* at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)*
* at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
* at java.lang.reflect.Method.invoke(Method.java:606)*
* at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)*
* at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)*
* at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
* at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
* at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)*
* at
com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:40)*
* at
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)*
* at
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)*
* at
org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:236)*
* at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)*
* at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)*
* at
org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:155)*
* at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)*
* at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
* at java.lang.reflect.Method.invoke(Method.java:606)*
* at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)*
* at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)*
* at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
* at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
* at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
* at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
Thanks,
Sourav
On Wed, Apr 22, 2015 at 11:40 PM, Tathagata Das <[email protected]> wrote:
> It could very well be that your executor memory is not enough to store the
> state RDDs AND operate on the data. 1G per executor is quite low.
> Definitely give more memory. And have you tried increasing the number of
> partitions (specify number of partitions in updateStateByKey) ?
>
> On Wed, Apr 22, 2015 at 2:34 AM, Sourav Chandra <
> [email protected]> wrote:
>
>> Anyone?
>>
>> On Wed, Apr 22, 2015 at 12:29 PM, Sourav Chandra <
>> [email protected]> wrote:
>>
>> > Hi Olivier,
>> >
>> > *the update function is as below*:
>> >
>> > *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long,
>> > Long)]) => {*
>> > * val previousCount = state.getOrElse((0L, 0L))._2*
>> > * var startValue: IConcurrentUsers = ConcurrentViewers(0)*
>> > * var currentCount = 0L*
>> > * val lastIndexOfConcurrentUsers =*
>> > * values.lastIndexWhere(_.isInstanceOf[ConcurrentViewers])*
>> > * val subList = values.slice(0, lastIndexOfConcurrentUsers)*
>> > * val currentCountFromSubList = subList.foldLeft(startValue)(_ op
>> > _).count + previousCount*
>> > * val lastConcurrentViewersCount =
>> > values(lastIndexOfConcurrentUsers).count*
>> >
>> > * if (math.abs(lastConcurrentViewersCount -
>> currentCountFromSubList)
>> > >= 1) {*
>> > * logger.error(*
>> > * s"Count using state updation $currentCountFromSubList, " +*
>> > * s"ConcurrentUsers count $lastConcurrentViewersCount" +*
>> > * s" resetting to $lastConcurrentViewersCount"*
>> > * )*
>> > * currentCount = lastConcurrentViewersCount*
>> > * }*
>> > * val remainingValuesList = values.diff(subList)*
>> > * startValue = ConcurrentViewers(currentCount)*
>> > * currentCount = remainingValuesList.foldLeft(startValue)(_ op
>> > _).count*
>> >
>> > * if (currentCount < 0) {*
>> >
>> > * logger.error(*
>> > * s"ERROR: Got new count $currentCount < 0, value:$values,
>> > state:$state, resetting to 0"*
>> > * )*
>> > * currentCount = 0*
>> > * }*
>> > * // to stop pushing subsequent 0 after receiving first 0*
>> > * if (currentCount == 0 && previousCount == 0) None*
>> > * else Some(previousCount, currentCount)*
>> > * }*
>> >
>> > *trait IConcurrentUsers {*
>> > * val count: Long*
>> > * def op(a: IConcurrentUsers): IConcurrentUsers =
>> > IConcurrentUsers.op(this, a)*
>> > *}*
>> >
>> > *object IConcurrentUsers {*
>> > * def op(a: IConcurrentUsers, b: IConcurrentUsers): IConcurrentUsers =
>> > (a, b) match {*
>> > * case (_, _: ConcurrentViewers) => *
>> > * ConcurrentViewers(b.count)*
>> > * case (_: ConcurrentViewers, _: IncrementConcurrentViewers) => *
>> > * ConcurrentViewers(a.count + b.count)*
>> > * case (_: ConcurrentViewers, _: DecrementConcurrentViewers) => *
>> > * ConcurrentViewers(a.count - b.count)*
>> > * }*
>> > *}*
>> >
>> > *case class IncrementConcurrentViewers(count: Long) extends
>> > IConcurrentUsers*
>> > *case class DecrementConcurrentViewers(count: Long) extends
>> > IConcurrentUsers*
>> > *case class ConcurrentViewers(count: Long) extends IConcurrentUsers*
>> >
>> >
>> > *also the error stack trace copied from executor logs is:*
>> >
>> > *java.lang.OutOfMemoryError: Java heap space*
>> > * at
>> >
>> org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)*
>> > * at
>> >
>> org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2564)*
>> > * at
>> > org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)*
>> > * at
>> > org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)*
>> > * at
>> >
>> org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)*
>> > * at
>> org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)*
>> > * at
>> >
>> org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)*
>> > * at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*
>> > * at
>> >
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)*
>> > * at
>> >
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
>> > * at java.lang.reflect.Method.invoke(Method.java:601)*
>> > * at
>> > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)*
>> > * at
>> > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)*
>> > * at
>> >
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)*
>> > * at
>> > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)*
>> > * at
>> > java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)*
>> > * at
>> >
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)*
>> > * at
>> >
>> org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:236)*
>> > * at
>> >
>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)*
>> > * at
>> org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)*
>> > * at
>> >
>> org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:155)*
>> > * at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)*
>> > * at
>> >
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
>> > * at java.lang.reflect.Method.invoke(Method.java:601)*
>> > * at
>> > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)*
>> > * at
>> > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)*
>> > * at
>> >
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)*
>> > * at
>> > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)*
>> > * at
>> >
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)*
>> > * at
>> > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)*
>> > * at
>> >
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)*
>> > * at
>> > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)*
>> > *15/04/21 15:51:23 ERROR ExecutorUncaughtExceptionHandler: Uncaught
>> > exception in thread Thread[Executor task launch worker-1,5,main]*
>> >
>> >
>> >
>> > On Wed, Apr 22, 2015 at 1:32 AM, Olivier Girardot <[email protected]>
>> > wrote:
>> >
>> >> Hi Sourav,
>> >> Can you post your updateFunc as well please ?
>> >>
>> >> Regards,
>> >>
>> >> Olivier.
>> >>
>> >> Le mar. 21 avr. 2015 à 12:48, Sourav Chandra <
>> >> [email protected]> a écrit :
>> >>
>> >>> Hi,
>> >>>
>> >>> We are building a spark streaming application which reads from kafka,
>> >>> does updateStateBykey based on the received message type and finally
>> stores
>> >>> into redis.
>> >>>
>> >>> After running for few seconds the executor process get killed by
>> >>> throwing OutOfMemory error.
>> >>>
>> >>> The code snippet is below:
>> >>>
>> >>>
>> >>> *NoOfReceiverInstances = 1*
>> >>>
>> >>> *val kafkaStreams = (1 to NoOfReceiverInstances).map(*
>> >>> * _ => KafkaUtils.createStream(ssc, ZKQuorum, ConsumerGroup,
>> TopicsMap)*
>> >>> *)*
>> >>> *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long,
>> >>> Long)]) => {...}*
>> >>>
>> >>>
>> >>>
>> *ssc.union(kafkaStreams).map(KafkaMessageMapper(_)).filter(...)..updateStateByKey(updateFunc).foreachRDD(_.foreachPartition(RedisHelper.update(_)))*
>> >>>
>> >>>
>> >>>
>> >>> *object RedisHelper {*
>> >>> * private val client = scredis.Redis(*
>> >>> *
>> >>>
>> ConfigFactory.parseProperties(System.getProperties).getConfig(namespace)*
>> >>> * )*
>> >>>
>> >>> * def update(**itr: Iterator[(String, (Long, Long))]) {*
>> >>> * // redis save operation*
>> >>> * }*
>> >>>
>> >>> *}*
>> >>>
>> >>>
>> >>> *Below is the spark configuration:*
>> >>>
>> >>>
>> >>> * spark.app.name <http://spark.app.name> = "XXXXXXX"*
>> >>> * spark.jars = "xxxx.jar"*
>> >>> * spark.home = "/spark-1.1.1-bin-hadoop2.4"*
>> >>> * spark.executor.memory = 1g*
>> >>> * spark.streaming.concurrentJobs = 1000*
>> >>> * spark.logConf = true*
>> >>> * spark.cleaner.ttl = 3600 //in milliseconds*
>> >>> * spark.default.parallelism = 12*
>> >>> * spark.executor.extraJavaOptions = "-Xloggc:gc.log
>> >>> -XX:+PrintGCDetails -XX:+UseConcMarkSweepGC -XX:HeapDumpPath=1.hprof
>> >>> -XX:+HeapDumpOnOutOfMemoryError"*
>> >>> * spark.executor.logs.rolling.strategy = "size"*
>> >>> * spark.executor.logs.rolling.size.maxBytes = 104857600 // 100 MB*
>> >>> * spark.executor.logs.rolling.maxRetainedFiles = 10*
>> >>> * spark.serializer = "org.apache.spark.serializer.KryoSerializer"*
>> >>> * spark.kryo.registrator = "xxx.NoOpKryoRegistrator"*
>> >>>
>> >>>
>> >>> other configurations are below
>> >>>
>> >>> *streaming {*
>> >>> * // All streaming context related configs should come here*
>> >>> * batch-duration = "1 second"*
>> >>> * checkpoint-directory = "/tmp"*
>> >>> * checkpoint-duration = "10 seconds"*
>> >>> * slide-duration = "1 second"*
>> >>> * window-duration = "1 second"*
>> >>> * partitions-for-shuffle-task = 32*
>> >>> * }*
>> >>> * kafka {*
>> >>> * no-of-receivers = 1*
>> >>> * zookeeper-quorum = "xxxx:2181"*
>> >>> * consumer-group = "xxxxx"*
>> >>> * topic = "xxxxx:2"*
>> >>> * }*
>>
>> >>>
>> >>> We tried different combinations like
>> >>> - with spark 1.1.0 and 1.1.1.
>> >>> - by increasing executor memory
>> >>> - by changing the serialization strategy (switching between kryo and
>> >>> normal java)
>> >>> - by changing broadcast strategy (switching between http and torrent
>> >>> broadcast)
>> >>>
>> >>>
>> >>> Can anyone give any insight what we are missing here? How can we fix
>> >>> this?
>> >>>
>> >>> Due to akka version mismatch with some other libraries we cannot
>> upgrade
>> >>> the spark version.
>> >>>
>> >>> Thanks,
>> >>> --
>> >>>
>> >>> Sourav Chandra
>> >>>
>> >>> Senior Software Engineer
>> >>>
>> >>> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
>> >>>
>> >>> [email protected]
>> >>>
>> >>> o: +91 80 4121 8723
>> >>>
>> >>> m: +91 988 699 3746
>> >>>
>> >>> skype: sourav.chandra
>> >>>
>> >>> Livestream
>> >>>
>> >>> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main,
>> 3rd
>> >>> Block, Koramangala Industrial Area,
>> >>>
>> >>> Bangalore 560034
>> >>>
>> >>> www.livestream.com
>> >>>
>> >>
>> >
>> >
>> > --
>> >
>> > Sourav Chandra
>> >
>> > Senior Software Engineer
>> >
>> > · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
>> >
>> > [email protected]
>> >
>> > o: +91 80 4121 8723
>> >
>> > m: +91 988 699 3746
>> >
>> > skype: sourav.chandra
>> >
>> > Livestream
>> >
>> > "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
>> > Block, Koramangala Industrial Area,
>> >
>> > Bangalore 560034
>> >
>> > www.livestream.com
>> >
>>
>>
>>
>> --
>>
>> Sourav Chandra
>>
>> Senior Software Engineer
>>
>> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
>>
>> [email protected]
>>
>> o: +91 80 4121 8723
>>
>> m: +91 988 699 3746
>>
>> skype: sourav.chandra
>>
>> Livestream
>>
>> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
>> Block, Koramangala Industrial Area,
>>
>> Bangalore 560034
>>
>> www.livestream.com
>>
>
>
--
Sourav Chandra
Senior Software Engineer
· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
[email protected]
o: +91 80 4121 8723
m: +91 988 699 3746
skype: sourav.chandra
Livestream
"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,
Bangalore 560034
www.livestream.com