Glad to hear that. :)

On Thu, Jun 18, 2015 at 6:25 AM, Ji ZHANG <zhangj...@gmail.com> wrote:

> Hi,
>
> We switched from ParallelGC to CMS, and the symptom is gone.
>
> On Thu, Jun 4, 2015 at 3:37 PM, Ji ZHANG <zhangj...@gmail.com> wrote:
>
>> Hi,
>>
>> I set spark.shuffle.io.preferDirectBufs to false in SparkConf and this
>> setting can be seen in web ui's environment tab. But, it still eats memory,
>> i.e. -Xmx set to 512M but RES grows to 1.5G in half a day.
>>
>>
>> On Wed, Jun 3, 2015 at 12:02 PM, Shixiong Zhu <zsxw...@gmail.com> wrote:
>>
>>> Could you set "spark.shuffle.io.preferDirectBufs" to false to turn off
>>> the off-heap allocation of netty?
>>>
>>> Best Regards,
>>> Shixiong Zhu
>>>
>>> 2015-06-03 11:58 GMT+08:00 Ji ZHANG <zhangj...@gmail.com>:
>>>
>>>> Hi,
>>>>
>>>> Thanks for you information. I'll give spark1.4 a try when it's released.
>>>>
>>>> On Wed, Jun 3, 2015 at 11:31 AM, Tathagata Das <t...@databricks.com>
>>>> wrote:
>>>>
>>>>> Could you try it out with Spark 1.4 RC3?
>>>>>
>>>>> Also pinging, Cloudera folks, they may be aware of something.
>>>>>
>>>>> BTW, the way I have debugged memory leaks in the past is as follows.
>>>>>
>>>>> Run with a small driver memory, say 1 GB. Periodically (maybe a
>>>>> script), take snapshots of histogram and also do memory dumps. Say every
>>>>> hour. And then compare the difference between two histo/dumps that are few
>>>>> hours separated (more the better). Diffing histo is easy. Diff two dumps
>>>>> can be done in JVisualVM, it will show the diff in the objects that got
>>>>> added in the later dump. That makes it easy to debug what is not getting
>>>>> cleaned.
>>>>>
>>>>> TD
>>>>>
>>>>>
>>>>> On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG <zhangj...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Thanks for you reply. Here's the top 30 entries of jmap -histo:live
>>>>>> result:
>>>>>>
>>>>>>  num     #instances         #bytes  class name
>>>>>> ----------------------------------------------
>>>>>>    1:         40802      145083848  [B
>>>>>>    2:         99264       12716112  <methodKlass>
>>>>>>    3:         99264       12291480  <constMethodKlass>
>>>>>>    4:          8472        9144816  <constantPoolKlass>
>>>>>>    5:          8472        7625192  <instanceKlassKlass>
>>>>>>    6:           186        6097824
>>>>>>  [Lscala.concurrent.forkjoin.ForkJoinTask;
>>>>>>    7:          7045        4804832  <constantPoolCacheKlass>
>>>>>>    8:        139168        4453376  java.util.HashMap$Entry
>>>>>>    9:          9427        3542512  <methodDataKlass>
>>>>>>   10:        141312        3391488
>>>>>>  io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
>>>>>>   11:        135491        3251784  java.lang.Long
>>>>>>   12:         26192        2765496  [C
>>>>>>   13:           813        1140560  [Ljava.util.HashMap$Entry;
>>>>>>   14:          8997        1061936  java.lang.Class
>>>>>>   15:         16022         851384  [[I
>>>>>>   16:         16447         789456  java.util.zip.Inflater
>>>>>>   17:         13855         723376  [S
>>>>>>   18:         17282         691280  java.lang.ref.Finalizer
>>>>>>   19:         25725         617400  java.lang.String
>>>>>>   20:           320         570368
>>>>>>  [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry;
>>>>>>   21:         16066         514112
>>>>>>  java.util.concurrent.ConcurrentHashMap$HashEntry
>>>>>>   22:         12288         491520
>>>>>>  org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment
>>>>>>   23:         13343         426976
>>>>>>  java.util.concurrent.locks.ReentrantLock$NonfairSync
>>>>>>   24:         12288         396416
>>>>>>  [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry;
>>>>>>   25:         16447         394728  java.util.zip.ZStreamRef
>>>>>>   26:           565         370080  [I
>>>>>>   27:           508         272288  <objArrayKlassKlass>
>>>>>>   28:         16233         259728  java.lang.Object
>>>>>>   29:           771         209232
>>>>>>  [Ljava.util.concurrent.ConcurrentHashMap$HashEntry;
>>>>>>   30:          2524         192312  [Ljava.lang.Object;
>>>>>>
>>>>>> But as I mentioned above, the heap memory seems OK, the extra memory
>>>>>> is consumed by some off-heap data. I can't find a way to figure out what 
>>>>>> is
>>>>>> in there.
>>>>>>
>>>>>> Besides, I did some extra experiments, i.e. run the same program in
>>>>>> difference environments to test whether it has off-heap memory issue:
>>>>>>
>>>>>> spark1.0 + standalone = no
>>>>>> spark1.0 + yarn = no
>>>>>> spark1.3 + standalone = no
>>>>>> spark1.3 + yarn = yes
>>>>>>
>>>>>> I'm using CDH5.1, so the spark1.0 is provided by cdh, and
>>>>>> spark-1.3.1-bin-hadoop2.3 is downloaded from the official website.
>>>>>>
>>>>>> I could use spark1.0 + yarn, but I can't find a way to handle the
>>>>>> logs, level and rolling, so it'll explode the harddrive.
>>>>>>
>>>>>> Currently I'll stick to spark1.0 + standalone, until our ops team
>>>>>> decides to upgrade cdh.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das <t...@databricks.com>
>>>>>> wrote:
>>>>>>
>>>>>>> While you are running is it possible for you login into the YARN
>>>>>>> node and get histograms of live objects using "jmap -histo:live". That 
>>>>>>> may
>>>>>>> reveal something.
>>>>>>>
>>>>>>>
>>>>>>> On Thursday, May 28, 2015, Ji ZHANG <zhangj...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Unfortunately, they're still growing, both driver and executors.
>>>>>>>>
>>>>>>>> I run the same job with local mode, everything is fine.
>>>>>>>>
>>>>>>>> On Thu, May 28, 2015 at 5:26 PM, Akhil Das <
>>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>>
>>>>>>>>> Can you replace your counting part with this?
>>>>>>>>>
>>>>>>>>> logs.filter(_.s_id > 0).foreachRDD(rdd => logger.info
>>>>>>>>> (rdd.count()))
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Best Regards
>>>>>>>>>
>>>>>>>>> On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG <zhangj...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I wrote a simple test job, it only does very basic operations.
>>>>>>>>>> for example:
>>>>>>>>>>
>>>>>>>>>>     val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
>>>>>>>>>> Map(topic -> 1)).map(_._2)
>>>>>>>>>>     val logs = lines.flatMap { line =>
>>>>>>>>>>       try {
>>>>>>>>>>         Some(parse(line).extract[Impression])
>>>>>>>>>>       } catch {
>>>>>>>>>>         case _: Exception => None
>>>>>>>>>>       }
>>>>>>>>>>     }
>>>>>>>>>>
>>>>>>>>>>     logs.filter(_.s_id > 0).count.foreachRDD { rdd =>
>>>>>>>>>>       rdd.foreachPartition { iter =>
>>>>>>>>>>         iter.foreach(count => logger.info(count.toString))
>>>>>>>>>>       }
>>>>>>>>>>     }
>>>>>>>>>>
>>>>>>>>>> It receives messages from Kafka, parse the json, filter and count
>>>>>>>>>> the records, and then print it to logs.
>>>>>>>>>>
>>>>>>>>>> Thanks.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, May 28, 2015 at 3:07 PM, Akhil Das <
>>>>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Zhang,
>>>>>>>>>>>
>>>>>>>>>>> Could you paste your code in a gist? Not sure what you are doing
>>>>>>>>>>> inside the code to fill up memory.
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Best Regards
>>>>>>>>>>>
>>>>>>>>>>> On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG <zhangj...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> Yes, I'm using createStream, but the storageLevel param is by
>>>>>>>>>>>> default MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also
>>>>>>>>>>>> growing. I don't think Kafka messages will be cached in driver.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, May 28, 2015 at 12:24 AM, Akhil Das <
>>>>>>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Are you using the createStream or createDirectStream api? If
>>>>>>>>>>>>> its the former, you can try setting the StorageLevel to 
>>>>>>>>>>>>> MEMORY_AND_DISK (it
>>>>>>>>>>>>> might slow things down though). Another way would be to try the 
>>>>>>>>>>>>> later one.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG <zhangj...@gmail.com
>>>>>>>>>>>>> > wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Akhil,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for your reply. Accoding to the Streaming tab of Web
>>>>>>>>>>>>>> UI, the Processing Time is around 400ms, and there's no 
>>>>>>>>>>>>>> Scheduling Delay,
>>>>>>>>>>>>>> so I suppose it's not the Kafka messages that eat up the 
>>>>>>>>>>>>>> off-heap memory.
>>>>>>>>>>>>>> Or maybe it is, but how to tell?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I googled about how to check the off-heap memory usage,
>>>>>>>>>>>>>> there's a tool called pmap, but I don't know how to interprete 
>>>>>>>>>>>>>> the results.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, May 27, 2015 at 3:08 PM, Akhil Das <
>>>>>>>>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> After submitting the job, if you do a ps aux | grep
>>>>>>>>>>>>>>> spark-submit then you can see all JVM params. Are you using the 
>>>>>>>>>>>>>>> highlevel
>>>>>>>>>>>>>>> consumer (receiver based) for receiving data from Kafka? In 
>>>>>>>>>>>>>>> that case if
>>>>>>>>>>>>>>> your throughput is high and the processing delay exceeds batch 
>>>>>>>>>>>>>>> interval
>>>>>>>>>>>>>>> then you will hit this memory issues as the data will keep on 
>>>>>>>>>>>>>>> receiving and
>>>>>>>>>>>>>>> is dumped to memory. You can set StorageLevel to 
>>>>>>>>>>>>>>> MEMORY_AND_DISK (but it
>>>>>>>>>>>>>>> slows things down). Another alternate will be to use the 
>>>>>>>>>>>>>>> lowlevel
>>>>>>>>>>>>>>> kafka consumer
>>>>>>>>>>>>>>> <https://github.com/dibbhatt/kafka-spark-consumer> or to
>>>>>>>>>>>>>>> use the non-receiver based directStream
>>>>>>>>>>>>>>> <https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers>
>>>>>>>>>>>>>>> that comes up with spark.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG <
>>>>>>>>>>>>>>> zhangj...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster
>>>>>>>>>>>>>>>> mode. I find out that YARN is killing the driver and executor 
>>>>>>>>>>>>>>>> process
>>>>>>>>>>>>>>>> because of excessive use of memory. Here's something I tried:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 1. Xmx is set to 512M and the GC looks fine (one ygc per
>>>>>>>>>>>>>>>> 10s), so the extra memory is not used by heap.
>>>>>>>>>>>>>>>> 2. I set the two memoryOverhead params to 1024 (default is
>>>>>>>>>>>>>>>> 384), but the memory just keeps growing and then hits the 
>>>>>>>>>>>>>>>> limit.
>>>>>>>>>>>>>>>> 3. This problem is not shown in low-throughput jobs,
>>>>>>>>>>>>>>>> neither in standalone mode.
>>>>>>>>>>>>>>>> 4. The test job just receives messages from Kafka, with
>>>>>>>>>>>>>>>> batch interval of 1, do some filtering and aggregation, and 
>>>>>>>>>>>>>>>> then print to
>>>>>>>>>>>>>>>> executor logs. So it's not some 3rd party library that causes 
>>>>>>>>>>>>>>>> the 'leak'.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Spark 1.3 is built by myself, with correct hadoop versions.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Any ideas will be appreciated.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> Jerry
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Jerry
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Jerry
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Jerry
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Jerry
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Jerry
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Jerry
>>>>
>>>
>>>
>>
>>
>> --
>> Jerry
>>
>
>
>
> --
> Jerry
>

Reply via email to