Glad to hear that. :) On Thu, Jun 18, 2015 at 6:25 AM, Ji ZHANG <zhangj...@gmail.com> wrote:
> Hi, > > We switched from ParallelGC to CMS, and the symptom is gone. > > On Thu, Jun 4, 2015 at 3:37 PM, Ji ZHANG <zhangj...@gmail.com> wrote: > >> Hi, >> >> I set spark.shuffle.io.preferDirectBufs to false in SparkConf and this >> setting can be seen in web ui's environment tab. But, it still eats memory, >> i.e. -Xmx set to 512M but RES grows to 1.5G in half a day. >> >> >> On Wed, Jun 3, 2015 at 12:02 PM, Shixiong Zhu <zsxw...@gmail.com> wrote: >> >>> Could you set "spark.shuffle.io.preferDirectBufs" to false to turn off >>> the off-heap allocation of netty? >>> >>> Best Regards, >>> Shixiong Zhu >>> >>> 2015-06-03 11:58 GMT+08:00 Ji ZHANG <zhangj...@gmail.com>: >>> >>>> Hi, >>>> >>>> Thanks for you information. I'll give spark1.4 a try when it's released. >>>> >>>> On Wed, Jun 3, 2015 at 11:31 AM, Tathagata Das <t...@databricks.com> >>>> wrote: >>>> >>>>> Could you try it out with Spark 1.4 RC3? >>>>> >>>>> Also pinging, Cloudera folks, they may be aware of something. >>>>> >>>>> BTW, the way I have debugged memory leaks in the past is as follows. >>>>> >>>>> Run with a small driver memory, say 1 GB. Periodically (maybe a >>>>> script), take snapshots of histogram and also do memory dumps. Say every >>>>> hour. And then compare the difference between two histo/dumps that are few >>>>> hours separated (more the better). Diffing histo is easy. Diff two dumps >>>>> can be done in JVisualVM, it will show the diff in the objects that got >>>>> added in the later dump. That makes it easy to debug what is not getting >>>>> cleaned. >>>>> >>>>> TD >>>>> >>>>> >>>>> On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG <zhangj...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> Thanks for you reply. Here's the top 30 entries of jmap -histo:live >>>>>> result: >>>>>> >>>>>> num #instances #bytes class name >>>>>> ---------------------------------------------- >>>>>> 1: 40802 145083848 [B >>>>>> 2: 99264 12716112 <methodKlass> >>>>>> 3: 99264 12291480 <constMethodKlass> >>>>>> 4: 8472 9144816 <constantPoolKlass> >>>>>> 5: 8472 7625192 <instanceKlassKlass> >>>>>> 6: 186 6097824 >>>>>> [Lscala.concurrent.forkjoin.ForkJoinTask; >>>>>> 7: 7045 4804832 <constantPoolCacheKlass> >>>>>> 8: 139168 4453376 java.util.HashMap$Entry >>>>>> 9: 9427 3542512 <methodDataKlass> >>>>>> 10: 141312 3391488 >>>>>> io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry >>>>>> 11: 135491 3251784 java.lang.Long >>>>>> 12: 26192 2765496 [C >>>>>> 13: 813 1140560 [Ljava.util.HashMap$Entry; >>>>>> 14: 8997 1061936 java.lang.Class >>>>>> 15: 16022 851384 [[I >>>>>> 16: 16447 789456 java.util.zip.Inflater >>>>>> 17: 13855 723376 [S >>>>>> 18: 17282 691280 java.lang.ref.Finalizer >>>>>> 19: 25725 617400 java.lang.String >>>>>> 20: 320 570368 >>>>>> [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry; >>>>>> 21: 16066 514112 >>>>>> java.util.concurrent.ConcurrentHashMap$HashEntry >>>>>> 22: 12288 491520 >>>>>> org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment >>>>>> 23: 13343 426976 >>>>>> java.util.concurrent.locks.ReentrantLock$NonfairSync >>>>>> 24: 12288 396416 >>>>>> [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry; >>>>>> 25: 16447 394728 java.util.zip.ZStreamRef >>>>>> 26: 565 370080 [I >>>>>> 27: 508 272288 <objArrayKlassKlass> >>>>>> 28: 16233 259728 java.lang.Object >>>>>> 29: 771 209232 >>>>>> [Ljava.util.concurrent.ConcurrentHashMap$HashEntry; >>>>>> 30: 2524 192312 [Ljava.lang.Object; >>>>>> >>>>>> But as I mentioned above, the heap memory seems OK, the extra memory >>>>>> is consumed by some off-heap data. I can't find a way to figure out what >>>>>> is >>>>>> in there. >>>>>> >>>>>> Besides, I did some extra experiments, i.e. run the same program in >>>>>> difference environments to test whether it has off-heap memory issue: >>>>>> >>>>>> spark1.0 + standalone = no >>>>>> spark1.0 + yarn = no >>>>>> spark1.3 + standalone = no >>>>>> spark1.3 + yarn = yes >>>>>> >>>>>> I'm using CDH5.1, so the spark1.0 is provided by cdh, and >>>>>> spark-1.3.1-bin-hadoop2.3 is downloaded from the official website. >>>>>> >>>>>> I could use spark1.0 + yarn, but I can't find a way to handle the >>>>>> logs, level and rolling, so it'll explode the harddrive. >>>>>> >>>>>> Currently I'll stick to spark1.0 + standalone, until our ops team >>>>>> decides to upgrade cdh. >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das <t...@databricks.com> >>>>>> wrote: >>>>>> >>>>>>> While you are running is it possible for you login into the YARN >>>>>>> node and get histograms of live objects using "jmap -histo:live". That >>>>>>> may >>>>>>> reveal something. >>>>>>> >>>>>>> >>>>>>> On Thursday, May 28, 2015, Ji ZHANG <zhangj...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Unfortunately, they're still growing, both driver and executors. >>>>>>>> >>>>>>>> I run the same job with local mode, everything is fine. >>>>>>>> >>>>>>>> On Thu, May 28, 2015 at 5:26 PM, Akhil Das < >>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>> >>>>>>>>> Can you replace your counting part with this? >>>>>>>>> >>>>>>>>> logs.filter(_.s_id > 0).foreachRDD(rdd => logger.info >>>>>>>>> (rdd.count())) >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Best Regards >>>>>>>>> >>>>>>>>> On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG <zhangj...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> I wrote a simple test job, it only does very basic operations. >>>>>>>>>> for example: >>>>>>>>>> >>>>>>>>>> val lines = KafkaUtils.createStream(ssc, zkQuorum, group, >>>>>>>>>> Map(topic -> 1)).map(_._2) >>>>>>>>>> val logs = lines.flatMap { line => >>>>>>>>>> try { >>>>>>>>>> Some(parse(line).extract[Impression]) >>>>>>>>>> } catch { >>>>>>>>>> case _: Exception => None >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> logs.filter(_.s_id > 0).count.foreachRDD { rdd => >>>>>>>>>> rdd.foreachPartition { iter => >>>>>>>>>> iter.foreach(count => logger.info(count.toString)) >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> It receives messages from Kafka, parse the json, filter and count >>>>>>>>>> the records, and then print it to logs. >>>>>>>>>> >>>>>>>>>> Thanks. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, May 28, 2015 at 3:07 PM, Akhil Das < >>>>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Zhang, >>>>>>>>>>> >>>>>>>>>>> Could you paste your code in a gist? Not sure what you are doing >>>>>>>>>>> inside the code to fill up memory. >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> Best Regards >>>>>>>>>>> >>>>>>>>>>> On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG <zhangj...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> Yes, I'm using createStream, but the storageLevel param is by >>>>>>>>>>>> default MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also >>>>>>>>>>>> growing. I don't think Kafka messages will be cached in driver. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thu, May 28, 2015 at 12:24 AM, Akhil Das < >>>>>>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Are you using the createStream or createDirectStream api? If >>>>>>>>>>>>> its the former, you can try setting the StorageLevel to >>>>>>>>>>>>> MEMORY_AND_DISK (it >>>>>>>>>>>>> might slow things down though). Another way would be to try the >>>>>>>>>>>>> later one. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks >>>>>>>>>>>>> Best Regards >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG <zhangj...@gmail.com >>>>>>>>>>>>> > wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Akhil, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for your reply. Accoding to the Streaming tab of Web >>>>>>>>>>>>>> UI, the Processing Time is around 400ms, and there's no >>>>>>>>>>>>>> Scheduling Delay, >>>>>>>>>>>>>> so I suppose it's not the Kafka messages that eat up the >>>>>>>>>>>>>> off-heap memory. >>>>>>>>>>>>>> Or maybe it is, but how to tell? >>>>>>>>>>>>>> >>>>>>>>>>>>>> I googled about how to check the off-heap memory usage, >>>>>>>>>>>>>> there's a tool called pmap, but I don't know how to interprete >>>>>>>>>>>>>> the results. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, May 27, 2015 at 3:08 PM, Akhil Das < >>>>>>>>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> After submitting the job, if you do a ps aux | grep >>>>>>>>>>>>>>> spark-submit then you can see all JVM params. Are you using the >>>>>>>>>>>>>>> highlevel >>>>>>>>>>>>>>> consumer (receiver based) for receiving data from Kafka? In >>>>>>>>>>>>>>> that case if >>>>>>>>>>>>>>> your throughput is high and the processing delay exceeds batch >>>>>>>>>>>>>>> interval >>>>>>>>>>>>>>> then you will hit this memory issues as the data will keep on >>>>>>>>>>>>>>> receiving and >>>>>>>>>>>>>>> is dumped to memory. You can set StorageLevel to >>>>>>>>>>>>>>> MEMORY_AND_DISK (but it >>>>>>>>>>>>>>> slows things down). Another alternate will be to use the >>>>>>>>>>>>>>> lowlevel >>>>>>>>>>>>>>> kafka consumer >>>>>>>>>>>>>>> <https://github.com/dibbhatt/kafka-spark-consumer> or to >>>>>>>>>>>>>>> use the non-receiver based directStream >>>>>>>>>>>>>>> <https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers> >>>>>>>>>>>>>>> that comes up with spark. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>> Best Regards >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG < >>>>>>>>>>>>>>> zhangj...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster >>>>>>>>>>>>>>>> mode. I find out that YARN is killing the driver and executor >>>>>>>>>>>>>>>> process >>>>>>>>>>>>>>>> because of excessive use of memory. Here's something I tried: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 1. Xmx is set to 512M and the GC looks fine (one ygc per >>>>>>>>>>>>>>>> 10s), so the extra memory is not used by heap. >>>>>>>>>>>>>>>> 2. I set the two memoryOverhead params to 1024 (default is >>>>>>>>>>>>>>>> 384), but the memory just keeps growing and then hits the >>>>>>>>>>>>>>>> limit. >>>>>>>>>>>>>>>> 3. This problem is not shown in low-throughput jobs, >>>>>>>>>>>>>>>> neither in standalone mode. >>>>>>>>>>>>>>>> 4. The test job just receives messages from Kafka, with >>>>>>>>>>>>>>>> batch interval of 1, do some filtering and aggregation, and >>>>>>>>>>>>>>>> then print to >>>>>>>>>>>>>>>> executor logs. So it's not some 3rd party library that causes >>>>>>>>>>>>>>>> the 'leak'. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Spark 1.3 is built by myself, with correct hadoop versions. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Any ideas will be appreciated. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>> Jerry >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> Jerry >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Jerry >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Jerry >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Jerry >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Jerry >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Jerry >>>> >>> >>> >> >> >> -- >> Jerry >> > > > > -- > Jerry >