Bill Graham
Wed, 10 Mar 2010 11:15:26 -0800
Yes, these errors appear in the Pig client and the jobs are definitely being executed on the cluster. I can see the data in HDFS and the jobs in the JobTracker UI of the cluster.
On Wed, Mar 10, 2010 at 10:54 AM, Ashutosh Chauhan <
ashutosh.chau...@gmail.com> wrote:
> [Low Memory Detector] [INFO] SpillableMemoryManager.java:143 low memory
> handler called
>
> Are you seeing this warning on client side, in pig logs? If so, then
> are you sure your job is actually running on real hadoop cluster.
> Because these logs should appear in task-tracker logs not in client
> logs. This may imply that you job is getting executed locally in local
> mode and not actually submitted to cluster. Look for the very first
> lines in the client logs, where Pig tries to connect to the cluster.
> See, if its successful in doing so.
>
>
>
> On Wed, Mar 10, 2010 at 10:15, Ashutosh Chauhan
> <ashutosh.chau...@gmail.com> wrote:
> > Posting for Bill.
> >
> >
> > ---------- Forwarded message ----------
> > From: Bill Graham <billgra...@gmail.com>
> > Date: Wed, Mar 10, 2010 at 10:11
> > Subject: Re: PigServer memory leak
> > To: ashutosh.chau...@gmail.com
> >
> >
> > Thanks for the reply, Ashutosh.
> >
> > [hadoop.apache.org keeps flagging my reply as spam, so I'm replying
> > directly to you. Feel free to push this conversation back onto the
> > list, if you can. :)]
> >
> > I'm running the same two scripts, one after the other, every 5
> > minutes. The scripts have dynamic tokens substituted to change the
> > input and output directories. Besides that, they have the same logic.
> >
> > I will try to execute the script from grunt next time it happens, but
> > I don't see how a lack of pig MR optimizations could cause a memory
> > issue on the client? If I bounce my daemon, the next jobs to run
> > executes without a problem upon start, so I would also expect a script
> > run through grunt at that time to run without a problem as well.
> >
> > I reverted back to re-initializing PigServer for every run. I have
> > other places in my scheduled workflow where I interact with HDFS which
> > I've now modified to re-use an instance of Hadoop's Configuration
> > object for the life of the VM. I was re-initializing that many times
> > per run. Looking at the Configuration code it seems to re-parse the
> > XML configs into a DOM every time it's called, so this certainly looks
> > like a place for a potential leak. If nothing else it should give me
> > an optimization. Configuration seems to be stateless and read-only
> > after initiation so this seems safe.
> >
> > Anyway, here are my two scripts. The first generates summaries, the
> > second makes a report from the summaries and they run in separate
> > PigServer instances via registerQuery(..). Let me know if you see
> > anything that seems off:
> >
> >
> > define chukwaLoader org.apache.hadoop.chukwa.
> > ChukwaStorage();
> > define tokenize com.foo.hadoop.mapreduce.pig.udf.TOKENIZE();
> > define regexMatch com.foo.hadoop.mapreduce.pig.udf.REGEX_MATCH();
> > define timePeriod org.apache.hadoop.chukwa.TimePartition('@TIME_PERIOD@
> ');
> >
> > raw = LOAD '@HADOOP_INPUT_LOCATION@'
> > USING chukwaLoader AS (ts: long, fields);
> > bodies = FOREACH raw GENERATE tokenize((chararray)fields#'body') as
> > tokens, timePeriod(ts) as time;
> >
> > -- pull values out of the URL
> > tokens1 = FOREACH bodies GENERATE
> > (int)regexMatch($0.token4, '(?:[?&])ptId=([^&]*)', 1) as
> pageTypeId,
> > (int)regexMatch($0.token4, '(?:[?&])sId=([^&]*)', 1) as siteId,
> > (int)regexMatch($0.token4, '(?:[?&])aId=([^&]*)', 1) as assetId,
> time,
> > regexMatch($0.token4, '(?:[?&])tag=([^&]*)', 1) as tagValue;
> >
> > -- filter out entries without an assetId
> > tokens2 = FILTER tokens1 BY
> > (assetId is not null) AND (pageTypeId is not null) AND (siteId is not
> null);
> >
> > -- group by tagValue, time, assetId and flatten to get counts
> > grouped = GROUP tokens2 BY (tagValue, time, assetId, pageTypeId, siteId);
> > flattened = FOREACH grouped GENERATE
> > FLATTEN(group) as (tagValue, time, assetId, pageTypeId, siteId),
> > COUNT(tokens2) as count;
> >
> > shifted = FOREACH flattened GENERATE time, count, assetId, pageTypeId,
> > siteId, tagValue;
> >
> > -- order and store
> > ordered = ORDER shifted BY tagValue ASC, count DESC, assetId DESC,
> > pageTypeId ASC, siteId ASC, time DESC;
> > STORE ordered INTO '@HADOOP_OUTPUT_LOCATION@';
> >
> >
> >
> >
> >
> > raw = LOAD '@HADOOP_INPUT_LOCATION@' USING PigStorage('\t') AS
> > (ts: long, count: int, assetId: int, pageTypeId: chararray,
> > siteId: int, tagValue: chararray);
> >
> > -- now store most popular overall - filtered by pageTypeId
> > most_popular_filtered = FILTER raw BY
> > (siteId == 162) AND (pageTypeId matches
> > '(2100)|(1606)|(1801)|(2300)|(2718)');
> > most_popular = GROUP most_popular_filtered BY (ts, assetId, pageTypeId);
> > most_popular_flattened = FOREACH most_popular GENERATE
> > FLATTEN(group) as (ts, assetId, pageTypeId),
> > SUM(most_popular_filtered.count) as count;
> > most_popular_shifted = FOREACH most_popular_flattened
> > GENERATE ts, count, assetId, (int)pageTypeId;
> >
> > most_popular_ordered = ORDER most_popular_shifted
> > BY ts DESC, count DESC, assetId ASC, pageTypeId ASC;
> > STORE most_popular_ordered INTO
> > '@HADOOP_OUTPUT_LOCATION@/most_popular_overall/';
> >
> > -- now store most popular by pagetype - filtered by pageTypeId
> > STORE most_popular_ordered INTO
> > '@HADOOP_OUTPUT_LOCATION@/most_popular_by_pagetype_temp' USING
> > com.foo.hadoop.mapreduce.pig.storage.MultiStorage(
> > '@HADOOP_OUTPUT_LOCATION@/most_popular_by_pagetype/', '3', 'none',
> '\t');
> >
> > -- now store most popular by tags - not filtered by pageTypeId
> > most_popular_by_tag_filtered = FILTER raw BY
> > (siteId == 162) AND (tagValue is not null);
> > most_popular_by_tag = GROUP most_popular_by_tag_filtered BY (ts,
> > assetId, pageTypeId, tagValue);
> > most_popular_flattened = FOREACH most_popular_by_tag
> > GENERATE FLATTEN(group) as (ts, assetId, pageTypeId, tagValue),
> > SUM(most_popular_by_tag_filtered.count) as count;
> > most_popular_by_tag_shifted = FOREACH most_popular_flattened
> > GENERATE ts, count, assetId, (int)pageTypeId, tagValue;
> > most_popular_by_tag_ordered = ORDER most_popular_by_tag_shifted
> > BY ts DESC, tagValue ASC, count DESC, assetId ASC, pageTypeId ASC;
> > STORE most_popular_by_tag_ordered INTO
> > '@HADOOP_OUTPUT_LOCATION@/most_popular_by_tag_temp' USING
> > com.foo.hadoop.mapreduce.pig.storage.MultiStorage(
> > '@HADOOP_OUTPUT_LOCATION@/most_popular_by_tag/', '4', 'none',
> '\t');
> >
> > On Tue, Mar 9, 2010 at 9:13 PM, Ashutosh Chauhan
> > <ashutosh.chau...@gmail.com> wrote:
> >>
> >> PigServer maintains some static state and in current implementation it
> >> is not safe to reuse it across different queries. You should create a
> >> new instance for every query.
> >>
> >> As for memory leak : Are you running exactly same query over same
> >> dataset repeatedly ? If yes and you run out of memory then there is
> >> memory leak somewhere. But, I doubt thats what you are doing. More
> >> likely, the message you are seeing has nothing to do with PigServer
> >> and is because of the query and/or dataset. That is your query may not
> >> be taking advantages of optimizations in pig. When you see that
> >> message, run same query using grunt or bin/pig and you should see same
> >> messages. Send the query you are firing, there might be way to
> >> optimize it and to avoid those messages.
> >>
> >> Hope that helps,
> >> Ashutosh
> >>
> >> On Tue, Mar 9, 2010 at 11:42, Bill Graham <billgra...@gmail.com> wrote:
> >> > Actually, upon closer investigation, re-using PigServer isn't working
> as
> >> > well as I thought. I'm digging into the issue.
> >> >
> >> > To step back a bit though, I want to pose a different question: What
> is the
> >> > intended usage of PigServer and PigContext w.r.t. it's scope? Should a
> new
> >> > instance of each be used for every job or is one or the other intended
> for
> >> > re-use throughout the lifecycle of the VM instance?
> >> >
> >> > Digging into the code of PigServer it seems like it's intended to be
> used
> >> > for a single script's execution only, but it's not entirely clear if
> that's
> >> > the case.
> >> >
> >> >
> >> >
> >> > On Tue, Mar 9, 2010 at 9:29 AM, Bill Graham <billgra...@gmail.com>
> wrote:
> >> >
> >> >> hi,
> >> >>
> >> >> I've got a long running daemon application that periodically kicks of
> Pig
> >> >> jobs via quartz (Pig version 0.4.0). It uses a wrapper class that
> initilizes
> >> >> an instance of PigServer before parsing and executing a pig script.
> As
> >> >> implemented, the app would leak memory and after a while jobs would
> fail to
> >> >> run with messages like this appearing in the logs:
> >> >>
> >> >> [Low Memory Detector] [INFO] SpillableMemoryManager.java:143 low
> memory
> >> >> handler called
> >> >>
> >> >> To fix the issue, I created an instance of PigServer at application
> >> >> initialization and I re-use that instance for all jobs for the life
> of the
> >> >> daemon. Problem solved.
> >> >>
> >> >> So my question is, is this a bug in PigServer that it leaks memory
> when
> >> >> multiple instances are created, or is that just improper use of the
> class?
> >> >>
> >> >> thanks,
> >> >> Bill
> >> >>
> >> >
> >
>