When I change the cogroup command to aggregate 3 filter results:

my_raw = LOAD './houred-small.txt' USING PigStorage('\t') AS (user,hour,
query);
part1 = filter my_raw by hour>11;
part2 = filter my_raw by hour<13;
part3 = filter my_raw by hour<15;
result = cogroup part1 by hour, part2 by hour, part3 by hour;
dump result;
explain result;


I get the following output. It seems the number the map task depends on the
input of cogroup command, and it seems, the input data houred-small.txt has
been read 3 times, presumably each map task will read input once. Am I
right?


HadoopVersion   PigVersion      UserId  StartedAt       FinishedAt
 Features
0.20.2  0.9.3-SNAPSHOT  root    2012-03-09 11:14:31     2012-03-09 11:15:00
    COGROUP,FILTER

Success!

Job Stats (time in seconds):
JobId   Maps    Reduces MaxMapTime      MinMapTIme      AvgMapTime
 MaxReduceTime   MinReduceTime   AvgReduceTime   Alias   Feature Outputs
job_201203021230_0073   3       1       3       3       3       12      12
     12      my_raw,part1,part2,part3,result COGROUP
hdfs://master:54310/tmp/temp2038439399/tmp1117641349,

Input(s):
Successfully read 49 records from:
"hdfs://master:54310/user/root/houred-small.txt"
Successfully read 49 records from:
"hdfs://master:54310/user/root/houred-small.txt"
Successfully read 49 records from:
"hdfs://master:54310/user/root/houred-small.txt"

Output(s):
Successfully stored 14 records (3547 bytes) in:
"hdfs://master:54310/tmp/temp2038439399/tmp1117641349"

Counters:
Total records written : 14
Total bytes written : 3547
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0

Job DAG:
job_201203021230_0073

Thanks!


On Thu, Mar 8, 2012 at 10:51 PM, Yongzhi Wang <[email protected]>wrote:

> So two map tasks should running same piece of code, but reading different
> input?
> Or two tasks actually running different code?
> Is there any way that I can track the real map reduce functions that the
> pig parsed to the worker?
> Or can you tell me which piece of source code in the pig project generate
> the map and reduce tasks parsed to the slave worker?
>
> Thanks!
>
>
>
> On Thu, Mar 8, 2012 at 8:23 PM, Dmitriy Ryaboy <[email protected]> wrote:
>
>> That's what I get for reading explain plans on an iphone. Sorry.
>>
>> So, yeah, the cogrouping is happening as part of the shuffle.
>> It seems like Pig's figuring a task per t1 and t2, (and then a logical
>> union of the two, which is just to indicate that tuples from both
>> relations go into the same meta-relation tagged with source, which
>> will then get cogrouped). It shouldn't, it should be able to reuse the
>> same scan of the source data for both t1 and t2.
>>
>> D
>>
>> On Thu, Mar 8, 2012 at 9:13 AM, Yongzhi Wang <[email protected]>
>> wrote:
>> > Thanks, Dmitriy. I understand that there is only one job containing 2
>> map
>> > tasks and 1 reduce tasks. But the problem is even if I only have one
>> input
>> > file with the size of 1.4k, (less than 50 rows of records), the stats
>> data
>> > still shows it needs 2 map tasks.
>> >
>> > The union operation is shown in the top of the Map plan tree:
>> (Union[tuple]
>> > - scope-85)
>> >
>> >  #--------------------------------------------------
>> >  # Map Reduce Plan
>> >  #--------------------------------------------------
>> >  MapReduce node scope-84
>> >  Map Plan
>> >  Union[tuple] - scope-85
>> >  |
>> >  |---result: Local Rearrange[tuple]{bytearray}(false) - scope-73
>> >  |   |   |
>> >  |   |   Project[bytearray][1] - scope-74
>> >  |   |
>> >  |   |---part1: Filter[bag] - scope-59
>> >  |       |   |
>> >  |       |   Greater Than[boolean] - scope-63
>> >  |       |   |
>> >  |       |   |---Cast[int] - scope-61
>> >  |       |   |   |
>> >  |       |   |   |---Project[bytearray][1] - scope-60
>> >  |       |   |
>> >  |       |   |---Constant(11) - scope-62
>> >  |       |
>> >  |       |---my_raw: New For Each(false,false,false)[bag] - scope-89
>> >  |           |   |
>> >  |           |   Project[bytearray][0] - scope-86
>> >  |           |   |
>> >  |           |   Project[bytearray][1] - scope-87
>> >  |           |   |
>> >  |           |   Project[bytearray][2] - scope-88
>> >  |           |
>> >  |           |---my_raw:
>> >  Load(hdfs://master:54310/user/root/houred-small:PigStorage('    ')) -
>> >  scope-90
>> >  |
>> >  |---result: Local Rearrange[tuple]{bytearray}(false) - scope-75
>> >    |   |
>> >    |   Project[bytearray][1] - scope-76
>> >    |
>> >    |---part2: Filter[bag] - scope-66
>> >        |   |
>> >        |   Less Than[boolean] - scope-70
>> >        |   |
>> >        |   |---Cast[int] - scope-68
>> >        |   |   |
>> >        |   |   |---Project[bytearray][1] - scope-67
>> >        |   |
>> >        |   |---Constant(13) - scope-69
>> >        |
>> >        |---my_raw: New For Each(false,false,false)[bag] - scope-94
>> >            |   |
>> >            |   Project[bytearray][0] - scope-91
>> >            |   |
>> >            |   Project[bytearray][1] - scope-92
>> >            |   |
>> >            |   Project[bytearray][2] - scope-93
>> >            |
>> >            |---my_raw:
>> >  Load(hdfs://master:54310/user/root/houred-small:PigStorage('    ')) -
>> >  scope-95--------
>> >  Reduce Plan
>> >  result: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-77
>> >  |
>> >  |---result: Package[tuple]{bytearray} - scope-72--------
>> >  Global sort: false
>> >
>> >
>> > On Thu, Mar 8, 2012 at 1:14 AM, Dmitriy Ryaboy <[email protected]>
>> wrote:
>> >
>> >> You are confusing map and reduce tasks with a mapreduce jobs. Your pig
>> >> script resulted in a single mapreduce job. The number of map tasks was
>> 2,
>> >> based on input size -- it has little to do with the actual operators
>> you
>> >> used.
>> >>
>> >> There is no union operator involved so I am not sure what you are
>> >> referring to with that.
>> >>
>> >> On Mar 7, 2012, at 8:09 AM, Yongzhi Wang <[email protected]>
>> >> wrote:
>> >>
>> >> > Hi, There
>> >> >
>> >> > I tried to use the syntax "explain", but the MapReduce plan sometime
>> >> > confused me.
>> >> >
>> >> > I tried such syntax below:
>> >> >
>> >> > *my_raw = LOAD './houred-small' USING PigStorage('\t') AS (user,hour,
>> >> > query);
>> >> > part1 = filter my_raw by hour>11;
>> >> > part2 = filter my_raw by hour<13;
>> >> > result = cogroup part1 by hour, part2 by hour;
>> >> > dump result;
>> >> > explain result;*
>> >> >
>> >> > The job stats shows as blow, indicating there are 2 Map tasks and 1
>> >> reduce
>> >> > tasks. But I don't know how does the Map task is mapping to the
>> MapReduce
>> >> > plan shown below. It seems each Map task just do one filter and
>> >> rearrange,
>> >> > but on which phase the union operation is done? the shuffle phase?
>> If in
>> >> > that case, two Map tasks actually done different filter work. Is that
>> >> > possible? Or my guess is wrong?
>> >> >
>> >> > So, back to the question: *Is there any way that I can see the
>> actual map
>> >> > and reduce task executed in the pig?*
>> >> >
>> >> > *Job Stats (time in seconds):
>> >> > JobId   Maps    Reduces MaxMapTime      MinMapTIme      AvgMapTime
>> >> > MaxReduceTime   MinReduceTime   AvgReduceTime   Alias   Feature
>> Outputs
>> >> > job_201203021230_0038   2       1       3       3       3       12
>> >> > 12     1    2       my_raw,part1,part2,result       COGROUP
>> >> > hdfs://master:54310/tmp/temp6260
>> >> > 37557/tmp-1661404166,
>> >> > *
>> >> >
>> >> > The mapreduce plan shows as below:*
>> >> > #--------------------------------------------------
>> >> > # Map Reduce Plan
>> >> > #--------------------------------------------------
>> >> > MapReduce node scope-84
>> >> > Map Plan
>> >> > Union[tuple] - scope-85
>> >> > |
>> >> > |---result: Local Rearrange[tuple]{bytearray}(false) - scope-73
>> >> > |   |   |
>> >> > |   |   Project[bytearray][1] - scope-74
>> >> > |   |
>> >> > |   |---part1: Filter[bag] - scope-59
>> >> > |       |   |
>> >> > |       |   Greater Than[boolean] - scope-63
>> >> > |       |   |
>> >> > |       |   |---Cast[int] - scope-61
>> >> > |       |   |   |
>> >> > |       |   |   |---Project[bytearray][1] - scope-60
>> >> > |       |   |
>> >> > |       |   |---Constant(11) - scope-62
>> >> > |       |
>> >> > |       |---my_raw: New For Each(false,false,false)[bag] - scope-89
>> >> > |           |   |
>> >> > |           |   Project[bytearray][0] - scope-86
>> >> > |           |   |
>> >> > |           |   Project[bytearray][1] - scope-87
>> >> > |           |   |
>> >> > |           |   Project[bytearray][2] - scope-88
>> >> > |           |
>> >> > |           |---my_raw:
>> >> > Load(hdfs://master:54310/user/root/houred-small:PigStorage('    ')) -
>> >> > scope-90
>> >> > |
>> >> > |---result: Local Rearrange[tuple]{bytearray}(false) - scope-75
>> >> >    |   |
>> >> >    |   Project[bytearray][1] - scope-76
>> >> >    |
>> >> >    |---part2: Filter[bag] - scope-66
>> >> >        |   |
>> >> >        |   Less Than[boolean] - scope-70
>> >> >        |   |
>> >> >        |   |---Cast[int] - scope-68
>> >> >        |   |   |
>> >> >        |   |   |---Project[bytearray][1] - scope-67
>> >> >        |   |
>> >> >        |   |---Constant(13) - scope-69
>> >> >        |
>> >> >        |---my_raw: New For Each(false,false,false)[bag] - scope-94
>> >> >            |   |
>> >> >            |   Project[bytearray][0] - scope-91
>> >> >            |   |
>> >> >            |   Project[bytearray][1] - scope-92
>> >> >            |   |
>> >> >            |   Project[bytearray][2] - scope-93
>> >> >            |
>> >> >            |---my_raw:
>> >> > Load(hdfs://master:54310/user/root/houred-small:PigStorage('    ')) -
>> >> > scope-95--------
>> >> > Reduce Plan
>> >> > result: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-77
>> >> > |
>> >> > |---result: Package[tuple]{bytearray} - scope-72--------
>> >> > Global sort: false
>> >> > ----------------*
>> >> >
>> >> > Thanks!
>> >>
>>
>
>

Reply via email to