Bill Graham
Fri, 19 Mar 2010 10:34:51 -0700
I believe I've found the cause of my Pig memory leak so I wanted to report back. I profiled my app after letting it run for a couple of days and found that the static toDelete Stack in the FileLocalizer object was growing over time without getting flushed. I had thousands of HFile objects in that stack. This produced a memory leak both in my app and in HDFS.
The fix seems straightforward enough in my app. I suspect calling
FileLocalizer.deleteTempFiles() after each usage of PigServer for a given
execution of a given pig script will do the trick.
This seems to be a major gotcha though that will likely burn others. I
suggest we add FileLocalizer.deleteTempFiles() to the shutdown() method of
PigServer. Thoughts?
Currently shutdown isn't doing much:
public void shutdown() {
// clean-up activities
// TODO: reclaim scope to free up resources. Currently
// this is not implemented and throws an exception
// hence, for now, we won't call it.
//
// pigContext.getExecutionEngine().reclaimScope(this.scope);
}
thanks,
Bill
On Wed, Mar 10, 2010 at 12:15 PM, Bill Graham <billgra...@gmail.com> wrote:
> Yes, these errors appear in the Pig client and the jobs are definitely
> being executed on the cluster. I can see the data in HDFS and the jobs in
> the JobTracker UI of the cluster.
>
>
> On Wed, Mar 10, 2010 at 10:54 AM, Ashutosh Chauhan <
> ashutosh.chau...@gmail.com> wrote:
>
>> [Low Memory Detector] [INFO] SpillableMemoryManager.java:143 low memory
>> handler called
>>
>> Are you seeing this warning on client side, in pig logs? If so, then
>> are you sure your job is actually running on real hadoop cluster.
>> Because these logs should appear in task-tracker logs not in client
>> logs. This may imply that you job is getting executed locally in local
>> mode and not actually submitted to cluster. Look for the very first
>> lines in the client logs, where Pig tries to connect to the cluster.
>> See, if its successful in doing so.
>>
>>
>>
>> On Wed, Mar 10, 2010 at 10:15, Ashutosh Chauhan
>> <ashutosh.chau...@gmail.com> wrote:
>> > Posting for Bill.
>> >
>> >
>> > ---------- Forwarded message ----------
>> > From: Bill Graham <billgra...@gmail.com>
>> > Date: Wed, Mar 10, 2010 at 10:11
>> > Subject: Re: PigServer memory leak
>> > To: ashutosh.chau...@gmail.com
>> >
>> >
>> > Thanks for the reply, Ashutosh.
>> >
>> > [hadoop.apache.org keeps flagging my reply as spam, so I'm replying
>> > directly to you. Feel free to push this conversation back onto the
>> > list, if you can. :)]
>> >
>> > I'm running the same two scripts, one after the other, every 5
>> > minutes. The scripts have dynamic tokens substituted to change the
>> > input and output directories. Besides that, they have the same logic.
>> >
>> > I will try to execute the script from grunt next time it happens, but
>> > I don't see how a lack of pig MR optimizations could cause a memory
>> > issue on the client? If I bounce my daemon, the next jobs to run
>> > executes without a problem upon start, so I would also expect a script
>> > run through grunt at that time to run without a problem as well.
>> >
>> > I reverted back to re-initializing PigServer for every run. I have
>> > other places in my scheduled workflow where I interact with HDFS which
>> > I've now modified to re-use an instance of Hadoop's Configuration
>> > object for the life of the VM. I was re-initializing that many times
>> > per run. Looking at the Configuration code it seems to re-parse the
>> > XML configs into a DOM every time it's called, so this certainly looks
>> > like a place for a potential leak. If nothing else it should give me
>> > an optimization. Configuration seems to be stateless and read-only
>> > after initiation so this seems safe.
>> >
>> > Anyway, here are my two scripts. The first generates summaries, the
>> > second makes a report from the summaries and they run in separate
>> > PigServer instances via registerQuery(..). Let me know if you see
>> > anything that seems off:
>> >
>> >
>> > define chukwaLoader org.apache.hadoop.chukwa.
>> > ChukwaStorage();
>> > define tokenize com.foo.hadoop.mapreduce.pig.udf.TOKENIZE();
>> > define regexMatch com.foo.hadoop.mapreduce.pig.udf.REGEX_MATCH();
>> > define timePeriod
>> org.apache.hadoop.chukwa.TimePartition('@TIME_PERIOD@');
>> >
>> > raw = LOAD '@HADOOP_INPUT_LOCATION@'
>> > USING chukwaLoader AS (ts: long, fields);
>> > bodies = FOREACH raw GENERATE tokenize((chararray)fields#'body') as
>> > tokens, timePeriod(ts) as time;
>> >
>> > -- pull values out of the URL
>> > tokens1 = FOREACH bodies GENERATE
>> > (int)regexMatch($0.token4, '(?:[?&])ptId=([^&]*)', 1) as
>> pageTypeId,
>> > (int)regexMatch($0.token4, '(?:[?&])sId=([^&]*)', 1) as siteId,
>> > (int)regexMatch($0.token4, '(?:[?&])aId=([^&]*)', 1) as assetId,
>> time,
>> > regexMatch($0.token4, '(?:[?&])tag=([^&]*)', 1) as tagValue;
>> >
>> > -- filter out entries without an assetId
>> > tokens2 = FILTER tokens1 BY
>> > (assetId is not null) AND (pageTypeId is not null) AND (siteId is
>> not null);
>> >
>> > -- group by tagValue, time, assetId and flatten to get counts
>> > grouped = GROUP tokens2 BY (tagValue, time, assetId, pageTypeId,
>> siteId);
>> > flattened = FOREACH grouped GENERATE
>> > FLATTEN(group) as (tagValue, time, assetId, pageTypeId, siteId),
>> > COUNT(tokens2) as count;
>> >
>> > shifted = FOREACH flattened GENERATE time, count, assetId, pageTypeId,
>> > siteId, tagValue;
>> >
>> > -- order and store
>> > ordered = ORDER shifted BY tagValue ASC, count DESC, assetId DESC,
>> > pageTypeId ASC, siteId ASC, time DESC;
>> > STORE ordered INTO '@HADOOP_OUTPUT_LOCATION@';
>> >
>> >
>> >
>> >
>> >
>> > raw = LOAD '@HADOOP_INPUT_LOCATION@' USING PigStorage('\t') AS
>> > (ts: long, count: int, assetId: int, pageTypeId: chararray,
>> > siteId: int, tagValue: chararray);
>> >
>> > -- now store most popular overall - filtered by pageTypeId
>> > most_popular_filtered = FILTER raw BY
>> > (siteId == 162) AND (pageTypeId matches
>> > '(2100)|(1606)|(1801)|(2300)|(2718)');
>> > most_popular = GROUP most_popular_filtered BY (ts, assetId, pageTypeId);
>> > most_popular_flattened = FOREACH most_popular GENERATE
>> > FLATTEN(group) as (ts, assetId, pageTypeId),
>> > SUM(most_popular_filtered.count) as count;
>> > most_popular_shifted = FOREACH most_popular_flattened
>> > GENERATE ts, count, assetId, (int)pageTypeId;
>> >
>> > most_popular_ordered = ORDER most_popular_shifted
>> > BY ts DESC, count DESC, assetId ASC, pageTypeId ASC;
>> > STORE most_popular_ordered INTO
>> > '@HADOOP_OUTPUT_LOCATION@/most_popular_overall/';
>> >
>> > -- now store most popular by pagetype - filtered by pageTypeId
>> > STORE most_popular_ordered INTO
>> > '@HADOOP_OUTPUT_LOCATION@/most_popular_by_pagetype_temp' USING
>> > com.foo.hadoop.mapreduce.pig.storage.MultiStorage(
>> > '@HADOOP_OUTPUT_LOCATION@/most_popular_by_pagetype/', '3',
>> 'none', '\t');
>> >
>> > -- now store most popular by tags - not filtered by pageTypeId
>> > most_popular_by_tag_filtered = FILTER raw BY
>> > (siteId == 162) AND (tagValue is not null);
>> > most_popular_by_tag = GROUP most_popular_by_tag_filtered BY (ts,
>> > assetId, pageTypeId, tagValue);
>> > most_popular_flattened = FOREACH most_popular_by_tag
>> > GENERATE FLATTEN(group) as (ts, assetId, pageTypeId, tagValue),
>> > SUM(most_popular_by_tag_filtered.count) as count;
>> > most_popular_by_tag_shifted = FOREACH most_popular_flattened
>> > GENERATE ts, count, assetId, (int)pageTypeId, tagValue;
>> > most_popular_by_tag_ordered = ORDER most_popular_by_tag_shifted
>> > BY ts DESC, tagValue ASC, count DESC, assetId ASC, pageTypeId ASC;
>> > STORE most_popular_by_tag_ordered INTO
>> > '@HADOOP_OUTPUT_LOCATION@/most_popular_by_tag_temp' USING
>> > com.foo.hadoop.mapreduce.pig.storage.MultiStorage(
>> > '@HADOOP_OUTPUT_LOCATION@/most_popular_by_tag/', '4', 'none',
>> '\t');
>> >
>> > On Tue, Mar 9, 2010 at 9:13 PM, Ashutosh Chauhan
>> > <ashutosh.chau...@gmail.com> wrote:
>> >>
>> >> PigServer maintains some static state and in current implementation it
>> >> is not safe to reuse it across different queries. You should create a
>> >> new instance for every query.
>> >>
>> >> As for memory leak : Are you running exactly same query over same
>> >> dataset repeatedly ? If yes and you run out of memory then there is
>> >> memory leak somewhere. But, I doubt thats what you are doing. More
>> >> likely, the message you are seeing has nothing to do with PigServer
>> >> and is because of the query and/or dataset. That is your query may not
>> >> be taking advantages of optimizations in pig. When you see that
>> >> message, run same query using grunt or bin/pig and you should see same
>> >> messages. Send the query you are firing, there might be way to
>> >> optimize it and to avoid those messages.
>> >>
>> >> Hope that helps,
>> >> Ashutosh
>> >>
>> >> On Tue, Mar 9, 2010 at 11:42, Bill Graham <billgra...@gmail.com>
>> wrote:
>> >> > Actually, upon closer investigation, re-using PigServer isn't working
>> as
>> >> > well as I thought. I'm digging into the issue.
>> >> >
>> >> > To step back a bit though, I want to pose a different question: What
>> is the
>> >> > intended usage of PigServer and PigContext w.r.t. it's scope? Should
>> a new
>> >> > instance of each be used for every job or is one or the other
>> intended for
>> >> > re-use throughout the lifecycle of the VM instance?
>> >> >
>> >> > Digging into the code of PigServer it seems like it's intended to be
>> used
>> >> > for a single script's execution only, but it's not entirely clear if
>> that's
>> >> > the case.
>> >> >
>> >> >
>> >> >
>> >> > On Tue, Mar 9, 2010 at 9:29 AM, Bill Graham <billgra...@gmail.com>
>> wrote:
>> >> >
>> >> >> hi,
>> >> >>
>> >> >> I've got a long running daemon application that periodically kicks
>> of Pig
>> >> >> jobs via quartz (Pig version 0.4.0). It uses a wrapper class that
>> initilizes
>> >> >> an instance of PigServer before parsing and executing a pig script.
>> As
>> >> >> implemented, the app would leak memory and after a while jobs would
>> fail to
>> >> >> run with messages like this appearing in the logs:
>> >> >>
>> >> >> [Low Memory Detector] [INFO] SpillableMemoryManager.java:143 low
>> memory
>> >> >> handler called
>> >> >>
>> >> >> To fix the issue, I created an instance of PigServer at application
>> >> >> initialization and I re-use that instance for all jobs for the life
>> of the
>> >> >> daemon. Problem solved.
>> >> >>
>> >> >> So my question is, is this a bug in PigServer that it leaks memory
>> when
>> >> >> multiple instances are created, or is that just improper use of the
>> class?
>> >> >>
>> >> >> thanks,
>> >> >> Bill
>> >> >>
>> >> >
>> >
>>
>
>