pig-user  

Re: Reducers slowing down? (UNCLASSIFIED)

Dmitriy Ryaboy
Thu, 11 Mar 2010 11:50:10 -0800

Can you check the task logs and see how the number of databag spills to disk
correlates with the number of tuples/bytes processed and the time a task
took? Sounds like there is some terrible skew going on, although cross
really shouldn't have that problem if it does what I think it should be
doing (which is probably wrong, I never use cross).

-D

2010/3/11 Winkler, Robert (Civ, ARL/CISD) <robert.wink...@us.army.mil>

> Classification: UNCLASSIFIED
> Caveats: NONE
>
> Yeah, that didn't work either. Ran for 3 days and then failed because of
> "too many fetch failures". It seems to get about 2/3 of the way through the
> reducers (regardless of the number) reasonably quickly and then just stalls
> or fails.
>
> Anyway, I changed the script to SPLIT the People dataset into 26 subsets
> based on whether the first character matched a-z and crossed each of those
> subsets with the Actors relation. This resulted in 26 separate Pig jobs
> running in parallel (I went back to PARALLEL 30 so each had 30 reducers).
>
> That worked. The shortest job took 53 minutes and the longest 22.5 hours.
> But I'm not sure what to make of this other than I shouldn't try to process
> a 500,000,000,000-tuple relation.
>
> -- Register CMU’s SecondString
> REGISTER
>
> /home/arl/Desktop/ARLDeveloper/JavaCOTS/SecondString/secondstring-20060615.j
> ar;
> -- Register ARL’s UDF SecondString Wrapper
> REGISTER
>
> /home/arl/Desktop/ARLDeveloper/JavaComponents/INSCOM/CandidateIdentification
> .jar;
> -- |People| ~ 62,500,000
> People = LOAD '/data/UniquePeoplePerStory' USING PigStorage(',') AS
> (file:chararray, name:chararray);
> -- Split People based on first character
> SPLIT People into A IF name MATCHES ‘^[a|A].*’, …. , Z IF name MATCHES
> ‘^[z|Z].*’;
> -- |Actors| ~ 8,000
> Actors = LOAD '/data/Actors' USING PigStorage(',') AS (actor:chararray);
> -- Process each split in parallel
> ToCompareA = CROSS Actors, A PARALLEL 30;
> AResults = FOREACH ToCompareA GENERATE $0, $1, $2,
> ARL.CandidateIdentificationUDF.Similarity($2, $0) ;
> STORE AResults INTO '/data/ScoredPeople/A' USING PigStorage(',');
> …
> ToCompareZ = CROSS Actors, Z PARALLEL 30;
> ZResults = FOREACH ToCompareZ GENERATE $0, $1, $2,
> ARL.CandidateIdentificationUDF.Similarity($2, $0) ;
> STORE ZResults INTO '/data/ScoredPeople/Z' USING PigStorage(',');
>
> -----Original Message-----
> From: Mridul Muralidharan [mailto:mrid...@yahoo-inc.com]
> Sent: Friday, March 05, 2010 9:39 PM
> To: pig-user@hadoop.apache.org
> Cc: Thejas Nair; Winkler, Robert (Civ, ARL/CISD)
> Subject: Re: Reducers slowing down? (UNCLASSIFIED)
>
> On Saturday 06 March 2010 04:47 AM, Thejas Nair wrote:
> > I am not sure why the rate at which output is generated is slowing down.
> > But cross in pig is not optimized ­ it uses only one reducer. (a major
> > limitation if you are trying to process lots of data with a large
> cluster!)
>
>
> CROSS is not supposed to use a single reducer - GRCross is parallel in
> pig, last time we checked (a while back though).
> It is parallel does not mean it is not expensive, it is still pretty
> darn expensive.
>
> Given this, the next might not work ?
>
>
> Robert, what about using a higher value of PARALLEL for CROSS ? (much
> higher than number of nodes, if required).
>
> Regards,
> Mridul
>
> >
> > You can try using skewed join instead ­ project a constant in both
> streams
> > and then join on that.
> >
> >
> > ToCompare = join Actors by 1, People by 1 using Œskewed¹ PARALLEL 30;
> >
> > I haven¹t tried this on very large dataset, I am interested knowing in
> how
> > this compares if you try it out.
> >
> > -Thejas
> >
> >
> >
> >
> > On 3/5/10 9:48 AM, "Winkler, Robert  (Civ, ARL/CISD)"
> > <robert.wink...@us.army.mil>  wrote:
> >
> >> Classification: UNCLASSIFIED
> >>
> >> Caveats: NONE
> >>
> >> Hello, I¹m using pig0.6.0 running the following script on a 27 datanode
> >> cluster running RedHat Enterprise 5.4:
> >>
> >>   -- Holds the Pig UDF wrapper around the SecondString SoftTFIDF
> function
> >>
> >> REGISTER /home/CandidateIdentification.jar;
> >>
> >> -- SecondString itself
> >>
> >> REGISTER /home/secondstring-20060615.jar;
> >>
> >> -- |People| ~ 62,500,000 from the English GigaWord 4th Edition
> >>
> >> People = LOAD '/data/UniquePeoplePerStory' USING PigStorage(',') AS
> >> (file:chararray, name:chararray);
> >>
> >> -- |Actors| ~ 8,000 from the Stanford Movie Database
> >>
> >> Actors = LOAD '/data/Actors' USING PigStorage(',') AS (actor:chararray);
> >>
> >> -- |ToCompare| ~ 500,000,000,000
> >>
> >> ToCompare = CROSS Actors, People PARALLEL 30;
> >>
> >>
> >>
> >> -- Score 'em and store 'em
> >>
> >> Results = FOREACH ToCompare GENERATE $0, $1, $2,
> >> ARL.CandidateIdentificationUDF.Similarity($2, $0);
> >>
> >> STORE Results INTO '/data/ScoredPeople' USING PigStorage(',');
> >>
> >> The first 100,000,000,000 reduce output records were produced in some 25
> >> hours. But after 75 hours it has produced a total of 140,000,000,000
> (instead
> >> of the 300,000,000,000 I was extrapolating) and seems to be producing
> them at
> >> a slower and slower rate. What is going on? Did I screw something up?
> >>
> >> Thanks,
> >>
> >> Robert
> >>
> >> Classification: UNCLASSIFIED
> >>
> >> Caveats: NONE
> >>
> >>
> >
> >
>
> Classification: UNCLASSIFIED
> Caveats: NONE
>
>
>