Ashutosh Chauhan
Wed, 10 Mar 2010 10:16:26 -0800
Posting for Bill.
---------- Forwarded message ----------
From: Bill Graham <billgra...@gmail.com>
Date: Wed, Mar 10, 2010 at 10:11
Subject: Re: PigServer memory leak
To: ashutosh.chau...@gmail.com
Thanks for the reply, Ashutosh.
[hadoop.apache.org keeps flagging my reply as spam, so I'm replying
directly to you. Feel free to push this conversation back onto the
list, if you can. :)]
I'm running the same two scripts, one after the other, every 5
minutes. The scripts have dynamic tokens substituted to change the
input and output directories. Besides that, they have the same logic.
I will try to execute the script from grunt next time it happens, but
I don't see how a lack of pig MR optimizations could cause a memory
issue on the client? If I bounce my daemon, the next jobs to run
executes without a problem upon start, so I would also expect a script
run through grunt at that time to run without a problem as well.
I reverted back to re-initializing PigServer for every run. I have
other places in my scheduled workflow where I interact with HDFS which
I've now modified to re-use an instance of Hadoop's Configuration
object for the life of the VM. I was re-initializing that many times
per run. Looking at the Configuration code it seems to re-parse the
XML configs into a DOM every time it's called, so this certainly looks
like a place for a potential leak. If nothing else it should give me
an optimization. Configuration seems to be stateless and read-only
after initiation so this seems safe.
Anyway, here are my two scripts. The first generates summaries, the
second makes a report from the summaries and they run in separate
PigServer instances via registerQuery(..). Let me know if you see
anything that seems off:
define chukwaLoader org.apache.hadoop.chukwa.
ChukwaStorage();
define tokenize com.foo.hadoop.mapreduce.pig.udf.TOKENIZE();
define regexMatch com.foo.hadoop.mapreduce.pig.udf.REGEX_MATCH();
define timePeriod org.apache.hadoop.chukwa.TimePartition('@TIME_PERIOD@');
raw = LOAD '@HADOOP_INPUT_LOCATION@'
USING chukwaLoader AS (ts: long, fields);
bodies = FOREACH raw GENERATE tokenize((chararray)fields#'body') as
tokens, timePeriod(ts) as time;
-- pull values out of the URL
tokens1 = FOREACH bodies GENERATE
(int)regexMatch($0.token4, '(?:[?&])ptId=([^&]*)', 1) as pageTypeId,
(int)regexMatch($0.token4, '(?:[?&])sId=([^&]*)', 1) as siteId,
(int)regexMatch($0.token4, '(?:[?&])aId=([^&]*)', 1) as assetId, time,
regexMatch($0.token4, '(?:[?&])tag=([^&]*)', 1) as tagValue;
-- filter out entries without an assetId
tokens2 = FILTER tokens1 BY
(assetId is not null) AND (pageTypeId is not null) AND (siteId is not null);
-- group by tagValue, time, assetId and flatten to get counts
grouped = GROUP tokens2 BY (tagValue, time, assetId, pageTypeId, siteId);
flattened = FOREACH grouped GENERATE
FLATTEN(group) as (tagValue, time, assetId, pageTypeId, siteId),
COUNT(tokens2) as count;
shifted = FOREACH flattened GENERATE time, count, assetId, pageTypeId,
siteId, tagValue;
-- order and store
ordered = ORDER shifted BY tagValue ASC, count DESC, assetId DESC,
pageTypeId ASC, siteId ASC, time DESC;
STORE ordered INTO '@HADOOP_OUTPUT_LOCATION@';
raw = LOAD '@HADOOP_INPUT_LOCATION@' USING PigStorage('\t') AS
(ts: long, count: int, assetId: int, pageTypeId: chararray,
siteId: int, tagValue: chararray);
-- now store most popular overall - filtered by pageTypeId
most_popular_filtered = FILTER raw BY
(siteId == 162) AND (pageTypeId matches
'(2100)|(1606)|(1801)|(2300)|(2718)');
most_popular = GROUP most_popular_filtered BY (ts, assetId, pageTypeId);
most_popular_flattened = FOREACH most_popular GENERATE
FLATTEN(group) as (ts, assetId, pageTypeId),
SUM(most_popular_filtered.count) as count;
most_popular_shifted = FOREACH most_popular_flattened
GENERATE ts, count, assetId, (int)pageTypeId;
most_popular_ordered = ORDER most_popular_shifted
BY ts DESC, count DESC, assetId ASC, pageTypeId ASC;
STORE most_popular_ordered INTO
'@HADOOP_OUTPUT_LOCATION@/most_popular_overall/';
-- now store most popular by pagetype - filtered by pageTypeId
STORE most_popular_ordered INTO
'@HADOOP_OUTPUT_LOCATION@/most_popular_by_pagetype_temp' USING
com.foo.hadoop.mapreduce.pig.storage.MultiStorage(
'@HADOOP_OUTPUT_LOCATION@/most_popular_by_pagetype/', '3', 'none', '\t');
-- now store most popular by tags - not filtered by pageTypeId
most_popular_by_tag_filtered = FILTER raw BY
(siteId == 162) AND (tagValue is not null);
most_popular_by_tag = GROUP most_popular_by_tag_filtered BY (ts,
assetId, pageTypeId, tagValue);
most_popular_flattened = FOREACH most_popular_by_tag
GENERATE FLATTEN(group) as (ts, assetId, pageTypeId, tagValue),
SUM(most_popular_by_tag_filtered.count) as count;
most_popular_by_tag_shifted = FOREACH most_popular_flattened
GENERATE ts, count, assetId, (int)pageTypeId, tagValue;
most_popular_by_tag_ordered = ORDER most_popular_by_tag_shifted
BY ts DESC, tagValue ASC, count DESC, assetId ASC, pageTypeId ASC;
STORE most_popular_by_tag_ordered INTO
'@HADOOP_OUTPUT_LOCATION@/most_popular_by_tag_temp' USING
com.foo.hadoop.mapreduce.pig.storage.MultiStorage(
'@HADOOP_OUTPUT_LOCATION@/most_popular_by_tag/', '4', 'none', '\t');
On Tue, Mar 9, 2010 at 9:13 PM, Ashutosh Chauhan
<ashutosh.chau...@gmail.com> wrote:
>
> PigServer maintains some static state and in current implementation it
> is not safe to reuse it across different queries. You should create a
> new instance for every query.
>
> As for memory leak : Are you running exactly same query over same
> dataset repeatedly ? If yes and you run out of memory then there is
> memory leak somewhere. But, I doubt thats what you are doing. More
> likely, the message you are seeing has nothing to do with PigServer
> and is because of the query and/or dataset. That is your query may not
> be taking advantages of optimizations in pig. When you see that
> message, run same query using grunt or bin/pig and you should see same
> messages. Send the query you are firing, there might be way to
> optimize it and to avoid those messages.
>
> Hope that helps,
> Ashutosh
>
> On Tue, Mar 9, 2010 at 11:42, Bill Graham <billgra...@gmail.com> wrote:
> > Actually, upon closer investigation, re-using PigServer isn't working as
> > well as I thought. I'm digging into the issue.
> >
> > To step back a bit though, I want to pose a different question: What is the
> > intended usage of PigServer and PigContext w.r.t. it's scope? Should a new
> > instance of each be used for every job or is one or the other intended for
> > re-use throughout the lifecycle of the VM instance?
> >
> > Digging into the code of PigServer it seems like it's intended to be used
> > for a single script's execution only, but it's not entirely clear if that's
> > the case.
> >
> >
> >
> > On Tue, Mar 9, 2010 at 9:29 AM, Bill Graham <billgra...@gmail.com> wrote:
> >
> >> hi,
> >>
> >> I've got a long running daemon application that periodically kicks of Pig
> >> jobs via quartz (Pig version 0.4.0). It uses a wrapper class that
> >> initilizes
> >> an instance of PigServer before parsing and executing a pig script. As
> >> implemented, the app would leak memory and after a while jobs would fail to
> >> run with messages like this appearing in the logs:
> >>
> >> [Low Memory Detector] [INFO] SpillableMemoryManager.java:143 low memory
> >> handler called
> >>
> >> To fix the issue, I created an instance of PigServer at application
> >> initialization and I re-use that instance for all jobs for the life of the
> >> daemon. Problem solved.
> >>
> >> So my question is, is this a bug in PigServer that it leaks memory when
> >> multiple instances are created, or is that just improper use of the class?
> >>
> >> thanks,
> >> Bill
> >>
> >