When I change the cogroup command to aggregate 3 filter results:
my_raw = LOAD './houred-small.txt' USING PigStorage('\t') AS (user,hour,
query);
part1 = filter my_raw by hour>11;
part2 = filter my_raw by hour<13;
part3 = filter my_raw by hour<15;
result = cogroup part1 by hour, part2 by hour, part3 by hour;
dump result;
explain result;
I get the following output. It seems the number the map task depends on the
input of cogroup command, and it seems, the input data houred-small.txt has
been read 3 times, presumably each map task will read input once. Am I
right?
HadoopVersion PigVersion UserId StartedAt FinishedAt
Features
0.20.2 0.9.3-SNAPSHOT root 2012-03-09 11:14:31 2012-03-09 11:15:00
COGROUP,FILTER
Success!
Job Stats (time in seconds):
JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime
MaxReduceTime MinReduceTime AvgReduceTime Alias Feature Outputs
job_201203021230_0073 3 1 3 3 3 12 12
12 my_raw,part1,part2,part3,result COGROUP
hdfs://master:54310/tmp/temp2038439399/tmp1117641349,
Input(s):
Successfully read 49 records from:
"hdfs://master:54310/user/root/houred-small.txt"
Successfully read 49 records from:
"hdfs://master:54310/user/root/houred-small.txt"
Successfully read 49 records from:
"hdfs://master:54310/user/root/houred-small.txt"
Output(s):
Successfully stored 14 records (3547 bytes) in:
"hdfs://master:54310/tmp/temp2038439399/tmp1117641349"
Counters:
Total records written : 14
Total bytes written : 3547
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0
Job DAG:
job_201203021230_0073
Thanks!
On Thu, Mar 8, 2012 at 10:51 PM, Yongzhi Wang <[email protected]>wrote:
> So two map tasks should running same piece of code, but reading different
> input?
> Or two tasks actually running different code?
> Is there any way that I can track the real map reduce functions that the
> pig parsed to the worker?
> Or can you tell me which piece of source code in the pig project generate
> the map and reduce tasks parsed to the slave worker?
>
> Thanks!
>
>
>
> On Thu, Mar 8, 2012 at 8:23 PM, Dmitriy Ryaboy <[email protected]> wrote:
>
>> That's what I get for reading explain plans on an iphone. Sorry.
>>
>> So, yeah, the cogrouping is happening as part of the shuffle.
>> It seems like Pig's figuring a task per t1 and t2, (and then a logical
>> union of the two, which is just to indicate that tuples from both
>> relations go into the same meta-relation tagged with source, which
>> will then get cogrouped). It shouldn't, it should be able to reuse the
>> same scan of the source data for both t1 and t2.
>>
>> D
>>
>> On Thu, Mar 8, 2012 at 9:13 AM, Yongzhi Wang <[email protected]>
>> wrote:
>> > Thanks, Dmitriy. I understand that there is only one job containing 2
>> map
>> > tasks and 1 reduce tasks. But the problem is even if I only have one
>> input
>> > file with the size of 1.4k, (less than 50 rows of records), the stats
>> data
>> > still shows it needs 2 map tasks.
>> >
>> > The union operation is shown in the top of the Map plan tree:
>> (Union[tuple]
>> > - scope-85)
>> >
>> > #--------------------------------------------------
>> > # Map Reduce Plan
>> > #--------------------------------------------------
>> > MapReduce node scope-84
>> > Map Plan
>> > Union[tuple] - scope-85
>> > |
>> > |---result: Local Rearrange[tuple]{bytearray}(false) - scope-73
>> > | | |
>> > | | Project[bytearray][1] - scope-74
>> > | |
>> > | |---part1: Filter[bag] - scope-59
>> > | | |
>> > | | Greater Than[boolean] - scope-63
>> > | | |
>> > | | |---Cast[int] - scope-61
>> > | | | |
>> > | | | |---Project[bytearray][1] - scope-60
>> > | | |
>> > | | |---Constant(11) - scope-62
>> > | |
>> > | |---my_raw: New For Each(false,false,false)[bag] - scope-89
>> > | | |
>> > | | Project[bytearray][0] - scope-86
>> > | | |
>> > | | Project[bytearray][1] - scope-87
>> > | | |
>> > | | Project[bytearray][2] - scope-88
>> > | |
>> > | |---my_raw:
>> > Load(hdfs://master:54310/user/root/houred-small:PigStorage(' ')) -
>> > scope-90
>> > |
>> > |---result: Local Rearrange[tuple]{bytearray}(false) - scope-75
>> > | |
>> > | Project[bytearray][1] - scope-76
>> > |
>> > |---part2: Filter[bag] - scope-66
>> > | |
>> > | Less Than[boolean] - scope-70
>> > | |
>> > | |---Cast[int] - scope-68
>> > | | |
>> > | | |---Project[bytearray][1] - scope-67
>> > | |
>> > | |---Constant(13) - scope-69
>> > |
>> > |---my_raw: New For Each(false,false,false)[bag] - scope-94
>> > | |
>> > | Project[bytearray][0] - scope-91
>> > | |
>> > | Project[bytearray][1] - scope-92
>> > | |
>> > | Project[bytearray][2] - scope-93
>> > |
>> > |---my_raw:
>> > Load(hdfs://master:54310/user/root/houred-small:PigStorage(' ')) -
>> > scope-95--------
>> > Reduce Plan
>> > result: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-77
>> > |
>> > |---result: Package[tuple]{bytearray} - scope-72--------
>> > Global sort: false
>> >
>> >
>> > On Thu, Mar 8, 2012 at 1:14 AM, Dmitriy Ryaboy <[email protected]>
>> wrote:
>> >
>> >> You are confusing map and reduce tasks with a mapreduce jobs. Your pig
>> >> script resulted in a single mapreduce job. The number of map tasks was
>> 2,
>> >> based on input size -- it has little to do with the actual operators
>> you
>> >> used.
>> >>
>> >> There is no union operator involved so I am not sure what you are
>> >> referring to with that.
>> >>
>> >> On Mar 7, 2012, at 8:09 AM, Yongzhi Wang <[email protected]>
>> >> wrote:
>> >>
>> >> > Hi, There
>> >> >
>> >> > I tried to use the syntax "explain", but the MapReduce plan sometime
>> >> > confused me.
>> >> >
>> >> > I tried such syntax below:
>> >> >
>> >> > *my_raw = LOAD './houred-small' USING PigStorage('\t') AS (user,hour,
>> >> > query);
>> >> > part1 = filter my_raw by hour>11;
>> >> > part2 = filter my_raw by hour<13;
>> >> > result = cogroup part1 by hour, part2 by hour;
>> >> > dump result;
>> >> > explain result;*
>> >> >
>> >> > The job stats shows as blow, indicating there are 2 Map tasks and 1
>> >> reduce
>> >> > tasks. But I don't know how does the Map task is mapping to the
>> MapReduce
>> >> > plan shown below. It seems each Map task just do one filter and
>> >> rearrange,
>> >> > but on which phase the union operation is done? the shuffle phase?
>> If in
>> >> > that case, two Map tasks actually done different filter work. Is that
>> >> > possible? Or my guess is wrong?
>> >> >
>> >> > So, back to the question: *Is there any way that I can see the
>> actual map
>> >> > and reduce task executed in the pig?*
>> >> >
>> >> > *Job Stats (time in seconds):
>> >> > JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime
>> >> > MaxReduceTime MinReduceTime AvgReduceTime Alias Feature
>> Outputs
>> >> > job_201203021230_0038 2 1 3 3 3 12
>> >> > 12 1 2 my_raw,part1,part2,result COGROUP
>> >> > hdfs://master:54310/tmp/temp6260
>> >> > 37557/tmp-1661404166,
>> >> > *
>> >> >
>> >> > The mapreduce plan shows as below:*
>> >> > #--------------------------------------------------
>> >> > # Map Reduce Plan
>> >> > #--------------------------------------------------
>> >> > MapReduce node scope-84
>> >> > Map Plan
>> >> > Union[tuple] - scope-85
>> >> > |
>> >> > |---result: Local Rearrange[tuple]{bytearray}(false) - scope-73
>> >> > | | |
>> >> > | | Project[bytearray][1] - scope-74
>> >> > | |
>> >> > | |---part1: Filter[bag] - scope-59
>> >> > | | |
>> >> > | | Greater Than[boolean] - scope-63
>> >> > | | |
>> >> > | | |---Cast[int] - scope-61
>> >> > | | | |
>> >> > | | | |---Project[bytearray][1] - scope-60
>> >> > | | |
>> >> > | | |---Constant(11) - scope-62
>> >> > | |
>> >> > | |---my_raw: New For Each(false,false,false)[bag] - scope-89
>> >> > | | |
>> >> > | | Project[bytearray][0] - scope-86
>> >> > | | |
>> >> > | | Project[bytearray][1] - scope-87
>> >> > | | |
>> >> > | | Project[bytearray][2] - scope-88
>> >> > | |
>> >> > | |---my_raw:
>> >> > Load(hdfs://master:54310/user/root/houred-small:PigStorage(' ')) -
>> >> > scope-90
>> >> > |
>> >> > |---result: Local Rearrange[tuple]{bytearray}(false) - scope-75
>> >> > | |
>> >> > | Project[bytearray][1] - scope-76
>> >> > |
>> >> > |---part2: Filter[bag] - scope-66
>> >> > | |
>> >> > | Less Than[boolean] - scope-70
>> >> > | |
>> >> > | |---Cast[int] - scope-68
>> >> > | | |
>> >> > | | |---Project[bytearray][1] - scope-67
>> >> > | |
>> >> > | |---Constant(13) - scope-69
>> >> > |
>> >> > |---my_raw: New For Each(false,false,false)[bag] - scope-94
>> >> > | |
>> >> > | Project[bytearray][0] - scope-91
>> >> > | |
>> >> > | Project[bytearray][1] - scope-92
>> >> > | |
>> >> > | Project[bytearray][2] - scope-93
>> >> > |
>> >> > |---my_raw:
>> >> > Load(hdfs://master:54310/user/root/houred-small:PigStorage(' ')) -
>> >> > scope-95--------
>> >> > Reduce Plan
>> >> > result: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-77
>> >> > |
>> >> > |---result: Package[tuple]{bytearray} - scope-72--------
>> >> > Global sort: false
>> >> > ----------------*
>> >> >
>> >> > Thanks!
>> >>
>>
>
>