pig-user  

Re: PigServer memory leak

Ashutosh Chauhan
Wed, 10 Mar 2010 10:55:39 -0800

[Low Memory Detector] [INFO] SpillableMemoryManager.java:143 low memory
handler called

Are you seeing this warning on client side, in pig logs? If so, then
are you sure your job is actually running on real hadoop cluster.
Because these logs should appear in task-tracker logs not in client
logs. This may imply that you job is getting executed locally in local
mode and not actually submitted to cluster. Look for the very first
lines in the client logs, where Pig tries to connect to the cluster.
See, if its successful in doing so.



On Wed, Mar 10, 2010 at 10:15, Ashutosh Chauhan
<ashutosh.chau...@gmail.com> wrote:
> Posting for Bill.
>
>
> ---------- Forwarded message ----------
> From: Bill Graham <billgra...@gmail.com>
> Date: Wed, Mar 10, 2010 at 10:11
> Subject: Re: PigServer memory leak
> To: ashutosh.chau...@gmail.com
>
>
> Thanks for the reply, Ashutosh.
>
> [hadoop.apache.org keeps flagging my reply as spam, so I'm replying
> directly to you. Feel free to push this conversation back onto the
> list, if you can. :)]
>
> I'm running the same two scripts, one after the other, every 5
> minutes. The scripts have dynamic tokens substituted to change the
> input and output directories. Besides that, they have the same logic.
>
> I will try to execute the script from grunt next time it happens, but
> I don't see how a lack of pig MR optimizations could cause a memory
> issue on the client? If I bounce my daemon, the next jobs to run
> executes without a problem upon start, so I would also expect a script
> run through grunt at that time to run without a problem as well.
>
> I reverted back to re-initializing PigServer for every run. I have
> other places in my scheduled workflow where I interact with HDFS which
> I've now modified to re-use an instance of Hadoop's Configuration
> object for the life of the VM. I was re-initializing that many times
> per run. Looking at the Configuration code it seems to re-parse the
> XML configs into a DOM every time it's called, so this certainly looks
> like a place for a potential leak. If nothing else it should give me
> an optimization. Configuration seems to be stateless and read-only
> after initiation so this seems safe.
>
> Anyway, here are my two scripts. The first generates summaries, the
> second makes a report from the summaries and they run in separate
> PigServer instances via registerQuery(..). Let me know if you see
> anything that seems off:
>
>
> define chukwaLoader org.apache.hadoop.chukwa.
> ChukwaStorage();
> define tokenize     com.foo.hadoop.mapreduce.pig.udf.TOKENIZE();
> define regexMatch   com.foo.hadoop.mapreduce.pig.udf.REGEX_MATCH();
> define timePeriod   org.apache.hadoop.chukwa.TimePartition('@TIME_PERIOD@');
>
> raw = LOAD '@HADOOP_INPUT_LOCATION@'
>     USING chukwaLoader AS (ts: long, fields);
> bodies = FOREACH raw GENERATE tokenize((chararray)fields#'body') as
> tokens, timePeriod(ts) as time;
>
> -- pull values out of the URL
> tokens1 = FOREACH bodies GENERATE
>       (int)regexMatch($0.token4, '(?:[?&])ptId=([^&]*)', 1) as pageTypeId,
>       (int)regexMatch($0.token4, '(?:[?&])sId=([^&]*)', 1) as siteId,
>       (int)regexMatch($0.token4, '(?:[?&])aId=([^&]*)', 1) as assetId, time,
>       regexMatch($0.token4, '(?:[?&])tag=([^&]*)', 1) as tagValue;
>
> -- filter out entries without an assetId
> tokens2 = FILTER tokens1 BY
>     (assetId is not null) AND (pageTypeId is not null) AND (siteId is not 
> null);
>
> -- group by tagValue, time, assetId and flatten to get counts
> grouped = GROUP tokens2 BY (tagValue, time, assetId, pageTypeId, siteId);
> flattened = FOREACH grouped GENERATE
>     FLATTEN(group) as (tagValue, time, assetId, pageTypeId, siteId),
> COUNT(tokens2) as count;
>
> shifted = FOREACH flattened GENERATE time, count, assetId, pageTypeId,
> siteId, tagValue;
>
> -- order and store
> ordered = ORDER shifted BY tagValue ASC, count DESC, assetId DESC,
> pageTypeId ASC, siteId ASC, time DESC;
> STORE ordered INTO '@HADOOP_OUTPUT_LOCATION@';
>
>
>
>
>
> raw = LOAD '@HADOOP_INPUT_LOCATION@' USING PigStorage('\t') AS
>         (ts: long, count: int, assetId: int, pageTypeId: chararray,
> siteId: int, tagValue: chararray);
>
> -- now store most popular overall - filtered by pageTypeId
> most_popular_filtered = FILTER raw BY
>     (siteId == 162) AND (pageTypeId matches
> '(2100)|(1606)|(1801)|(2300)|(2718)');
> most_popular = GROUP most_popular_filtered BY (ts, assetId, pageTypeId);
> most_popular_flattened = FOREACH most_popular GENERATE
>     FLATTEN(group) as (ts, assetId, pageTypeId),
> SUM(most_popular_filtered.count) as count;
> most_popular_shifted = FOREACH most_popular_flattened
>     GENERATE ts, count, assetId, (int)pageTypeId;
>
> most_popular_ordered = ORDER most_popular_shifted
>     BY ts DESC, count DESC, assetId ASC, pageTypeId ASC;
> STORE most_popular_ordered INTO
> '@HADOOP_OUTPUT_LOCATION@/most_popular_overall/';
>
> -- now store most popular by pagetype - filtered by pageTypeId
> STORE most_popular_ordered INTO
> '@HADOOP_OUTPUT_LOCATION@/most_popular_by_pagetype_temp' USING
>       com.foo.hadoop.mapreduce.pig.storage.MultiStorage(
>       '@HADOOP_OUTPUT_LOCATION@/most_popular_by_pagetype/', '3', 'none', 
> '\t');
>
> -- now store most popular by tags - not filtered by pageTypeId
> most_popular_by_tag_filtered = FILTER raw BY
>     (siteId == 162) AND (tagValue is not null);
> most_popular_by_tag = GROUP most_popular_by_tag_filtered BY (ts,
> assetId, pageTypeId, tagValue);
> most_popular_flattened = FOREACH most_popular_by_tag
>     GENERATE FLATTEN(group) as (ts, assetId, pageTypeId, tagValue),
> SUM(most_popular_by_tag_filtered.count) as count;
> most_popular_by_tag_shifted = FOREACH most_popular_flattened
>     GENERATE ts, count, assetId, (int)pageTypeId, tagValue;
> most_popular_by_tag_ordered = ORDER most_popular_by_tag_shifted
>     BY ts DESC, tagValue ASC, count DESC, assetId ASC, pageTypeId ASC;
> STORE most_popular_by_tag_ordered INTO
> '@HADOOP_OUTPUT_LOCATION@/most_popular_by_tag_temp' USING
>       com.foo.hadoop.mapreduce.pig.storage.MultiStorage(
>       '@HADOOP_OUTPUT_LOCATION@/most_popular_by_tag/', '4', 'none', '\t');
>
> On Tue, Mar 9, 2010 at 9:13 PM, Ashutosh Chauhan
> <ashutosh.chau...@gmail.com> wrote:
>>
>> PigServer maintains some static state and in current implementation it
>> is not safe to reuse it across different queries. You should create a
>> new instance for every query.
>>
>> As for memory leak : Are you running exactly same query over same
>> dataset repeatedly ? If yes and you run out of memory then there is
>> memory leak somewhere. But, I doubt thats what you are doing. More
>> likely, the message you are seeing has nothing to do with PigServer
>> and is because of the query and/or dataset. That is your query may not
>> be taking advantages of optimizations in pig. When you see that
>> message, run same query using grunt or bin/pig and you should see same
>> messages. Send the query you are firing, there might be way to
>> optimize it and to avoid those messages.
>>
>> Hope that helps,
>> Ashutosh
>>
>> On Tue, Mar 9, 2010 at 11:42, Bill Graham <billgra...@gmail.com> wrote:
>> > Actually, upon closer investigation, re-using PigServer isn't working as
>> > well as I thought. I'm digging into the issue.
>> >
>> > To step back a bit though, I want to pose a different question: What is the
>> > intended usage of PigServer and PigContext w.r.t. it's scope? Should a new
>> > instance of each be used for every job or is one or the other intended for
>> > re-use throughout the lifecycle of the VM instance?
>> >
>> > Digging into the code of PigServer it seems like it's intended to be used
>> > for a single script's execution only, but it's not entirely clear if that's
>> > the case.
>> >
>> >
>> >
>> > On Tue, Mar 9, 2010 at 9:29 AM, Bill Graham <billgra...@gmail.com> wrote:
>> >
>> >> hi,
>> >>
>> >> I've got a long running daemon application that periodically kicks of Pig
>> >> jobs via quartz (Pig version 0.4.0). It uses a wrapper class that 
>> >> initilizes
>> >> an instance of PigServer before parsing and executing a pig script. As
>> >> implemented, the app would leak memory and after a while jobs would fail 
>> >> to
>> >> run with messages like this appearing in the logs:
>> >>
>> >> [Low Memory Detector] [INFO] SpillableMemoryManager.java:143 low memory
>> >> handler called
>> >>
>> >> To fix the issue, I created an instance of PigServer at application
>> >> initialization and I re-use that instance for all jobs for the life of the
>> >> daemon. Problem solved.
>> >>
>> >> So my question is, is this a bug in PigServer that it leaks memory when
>> >> multiple instances are created, or is that just improper use of the class?
>> >>
>> >> thanks,
>> >> Bill
>> >>
>> >
>