[ 
https://issues.apache.org/jira/browse/HADOOP-5223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12718153#action_12718153
 ] 

Scott Carey commented on HADOOP-5223:
-------------------------------------

I've had issues with shuffle on 0.19, on a cluster with new hardware capable of 
running 13+ maps with 11+ reduces concurrently (dual quad core w/ 
hyperthreading = 16 hardware threads, 24GB RAM, 4 drives), shuffle is always my 
bottleneck on any job where the maps aren't huge or they condense data down 
significantly before the reduce.   During this bottleneck, disk, network, and 
CPU are calm.  I collected quite a few trhead dumps in this state on many 
different jobs.  Increasing parallel copies and tasktracker http threads had no 
effect.  For the most part, the thread dumps always had the shuffle fetch 
threads idle, and the main thread here:

"main" prio=10 tid=0x000000005eed3800 nid=0x62a2 waiting on condition 
[0x0000000040eb4000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at 
org.apache.hadoop.mapred.ReduceTask$ReduceCopier.getMapCompletionEvents(ReduceTask.java:2244)
        at 
org.apache.hadoop.mapred.ReduceTask$ReduceCopier.fetchOutputs(ReduceTask.java:1716)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:368)
        at org.apache.hadoop.mapred.Child.main(Child.java:158)


There is a one line fix.   This JIRA is refacoring the code that this fix would 
apply to however, so i'll post my info here to make sure the improvements are 
contained in it.
The fix improves times by 40% on a composite set of jobs (a cascading flow 
consisting of 25+ map/reduce jobs, with up to 7 concurrent ones).  

First, the fix I made is the below:

Comment out or delete the line:
{code} break; //we have a map from this host {code}
in ReduceOutput.java in ReduceCopier.fetchOutputs()
-- line 1804 in 0.19.2-dev, 1911 on 0.20.1-dev and line 1954 currently on 
trunk. 

Reduces that took 1 to 2 minutes while copying little data in the shuffle phase 
on average on my set of jobs now take 1 to 3 seconds on average in the shuffle.

Here is the problem.  The shuffle is currently limiting itself to only copy one 
shard from a single host per pass, and then sleeping.  As long as the number of 
map shards is much more than the number of hosts, this requires quite a few 
sleep delays.  For servers that can handle many map and reduce tasks each, this 
gets out of hand quickly, especially on small or medium sized clusters where 
the ideal concurrent shuffle copies per reduce is on the order of, or larger 
than, the number of hosts.
  
A more sophisticated fix such as this JIRA will do more, but the low hanging 
fruit performance fix is to get every shard that is reported from the last ping 
before sleeping again and checking for more.  This not only improves the 
shuffle speed, but reduces the total number of pings to find out what shards 
are available which reduces load elsewhere.  It makes little sense to do what 
happens now on a small cluster:
Discover say, 100 shards are needed to be fetched, grab 8 of them, then sleep, 
ask the again what are available, grab only 8, sleep ...
At the very least, if there are 100 map outputs available to a reducer, it 
should keep draining from this list before sleeping and asking for an updated 
set.

Some may object to opening more than one concurrent connection to a host on the 
basis that it could overload a tasktracker -- but this would seem like a false 
assumption to me.  First, tasktrackers throttle this with the configuration 
parameter for number http threads.  Second, reduces throttle this with the 
number of concurrent shuffle fetch threads.   There is no difference between a 
reduce opening 10 concurrent shuffle threads to 10 hosts and 10 to one host, 
when all reduces are concurrently doing this and randomly choosing hosts the 
average number of concurrent connections on one TT will remain the same.    
If it is a serious concern for other reasons (the 'penalty box'? or other error 
handling?) then the shuffle queue could be filled in a better order than one 
host at a time, or at least not sleep and re-fetch the list without first 
draining it.  A more significant refactor may do better than the one liner -- 
but I suspect this alone is most of the performance gain.


Here is a sample log before and after the change on 0.19 with a small dev 
cluster with newer hardware -- a particularly bad case for this:
3 TT's, each configured for 13 concurrent maps, 11 concurrent reduces, 10 
concurrent shuffle copies, 40 TT http threads:

Before: {code}
2009-06-09 22:13:53,657 INFO org.apache.hadoop.mapred.ReduceTask: 
attempt_200906082206_0006_r_000004_0 Need another 51 map output(s) where 0 is 
already in progress
2009-06-09 22:13:53,672 INFO org.apache.hadoop.mapred.ReduceTask: Ignoring 
obsolete output of KILLED map-task: 'attempt_200906082206_0006_m_000050_0'
2009-06-09 22:13:53,672 INFO org.apache.hadoop.mapred.ReduceTask: 
attempt_200906082206_0006_r_000004_0: Got 51 new map-outputs
2009-06-09 22:13:53,672 INFO org.apache.hadoop.mapred.ReduceTask: 
attempt_200906082206_0006_r_000004_0: Got 1 obsolete map-outputs from 
tasktracker 
2009-06-09 22:13:53,672 INFO org.apache.hadoop.mapred.ReduceTask: 
attempt_200906082206_0006_r_000004_0 Scheduled 3 outputs (0 slow hosts and0 dup 
hosts)
2009-06-09 22:13:53,689 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 
54394 bytes (54398 raw bytes) into RAM from attempt_200906082206_0006_m_000014_0
2009-06-09 22:13:53,690 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 
70736 bytes (70740 raw bytes) into RAM from attempt_200906082206_0006_m_000003_0
2009-06-09 22:13:53,690 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 
73540 bytes (73544 raw bytes) into RAM from attempt_200906082206_0006_m_000001_0
2009-06-09 22:13:53,692 INFO org.apache.hadoop.mapred.ReduceTask: Read 54394 
bytes from map-output for attempt_200906082206_0006_m_000014_0
2009-06-09 22:13:53,692 INFO org.apache.hadoop.mapred.ReduceTask: Read 70736 
bytes from map-output for attempt_200906082206_0006_m_000003_0
2009-06-09 22:13:53,692 INFO org.apache.hadoop.mapred.ReduceTask: Read 73540 
bytes from map-output for attempt_200906082206_0006_m_000001_0
2009-06-09 22:13:53,692 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from 
attempt_200906082206_0006_m_000014_0 -> (21, 205) from 10.3.0.142
2009-06-09 22:13:53,692 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from 
attempt_200906082206_0006_m_000003_0 -> (21, 240) from 10.3.0.143
2009-06-09 22:13:53,693 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from 
attempt_200906082206_0006_m_000001_0 -> (21, 204) from 10.3.0.141
2009-06-09 22:13:55,662 INFO org.apache.hadoop.mapred.ReduceTask: 
attempt_200906082206_0006_r_000004_0 Scheduled 3 outputs (0 slow hosts and0 dup 
hosts)
  
 -- SNIP --

2009-06-09 22:14:49,753 INFO org.apache.hadoop.mapred.ReduceTask: Read 79913 
bytes from map-output for attempt_200906082206_0006_m_000042_0
2009-06-09 22:14:49,753 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from 
attempt_200906082206_0006_m_000042_0 -> (21, 237) from 10.3.0.141
2009-06-09 22:14:50,751 INFO org.apache.hadoop.mapred.ReduceTask: Closed ram 
manager
2009-06-09 22:14:50,751 INFO org.apache.hadoop.mapred.ReduceTask: Interleaved 
on-disk merge complete: 0 files left.
2009-06-09 22:14:50,752 INFO org.apache.hadoop.mapred.ReduceTask: In-memory 
merge complete: 51 files left.
2009-06-09 22:14:50,813 INFO org.apache.hadoop.mapred.Merger: Merging 51 sorted 
segments
2009-06-09 22:14:50,817 INFO org.apache.hadoop.mapred.Merger: Down to the last 
merge-pass, with 51 segments left of total size: 3325252 bytes
{code}

After  -- (slightly different job): {code}
2009-06-08 23:51:07,057 INFO org.apache.hadoop.mapred.ReduceTask: 
attempt_200906082336_0014_r_000009_0 Thread waiting: Thread for merging on-disk 
files
2009-06-08 23:51:07,058 INFO org.apache.hadoop.mapred.ReduceTask: 
attempt_200906082336_0014_r_000009_0 Need another 68 map output(s) where 0 is 
already in progress
2009-06-08 23:51:07,069 INFO org.apache.hadoop.mapred.ReduceTask: Ignoring 
obsolete output of KILLED map-task: 'attempt_200906082336_0014_m_000060_1'
2009-06-08 23:51:07,070 INFO org.apache.hadoop.mapred.ReduceTask: Ignoring 
obsolete output of KILLED map-task: 'attempt_200906082336_0014_m_000014_0'
2009-06-08 23:51:07,070 INFO org.apache.hadoop.mapred.ReduceTask: 
attempt_200906082336_0014_r_000009_0: Got 68 new map-outputs
2009-06-08 23:51:07,070 INFO org.apache.hadoop.mapred.ReduceTask: 
attempt_200906082336_0014_r_000009_0: Got 2 obsolete map-outputs from 
tasktracker 
2009-06-08 23:51:07,071 INFO org.apache.hadoop.mapred.ReduceTask: 
attempt_200906082336_0014_r_000009_0 Scheduled 68 outputs (0 slow hosts and0 
dup hosts)
2009-06-08 23:51:07,106 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 
674904 bytes (674908 raw bytes) into RAM from 
attempt_200906082336_0014_m_000005_0
2009-06-08 23:51:07,110 INFO org.apache.hadoop.mapred.ReduceTask: Read 674904 
bytes from map-output for attempt_200906082336_0014_m_000005_0
2009-06-08 23:51:07,110 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from 
attempt_200906082336_0014_m_000005_0 -> (61, 26) from 10.3.0.143

-- SNIP --

2009-06-08 23:51:08,389 INFO org.apache.hadoop.mapred.ReduceTask: Read 1439739 
bytes from map-output for attempt_200906082336_0014_m_000012_1
2009-06-08 23:51:08,389 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from 
attempt_200906082336_0014_m_000012_1 -> (50, 25) from 10.3.0.141
2009-06-08 23:51:09,064 INFO org.apache.hadoop.mapred.ReduceTask: Closed ram 
manager
2009-06-08 23:51:09,064 INFO org.apache.hadoop.mapred.ReduceTask: Interleaved 
on-disk merge complete: 0 files left.
2009-06-08 23:51:09,064 INFO org.apache.hadoop.mapred.ReduceTask: In-memory 
merge complete: 68 files left.
2009-06-08 23:51:09,122 INFO org.apache.hadoop.mapred.Merger: Merging 68 sorted 
segments
2009-06-08 23:51:09,126 INFO org.apache.hadoop.mapred.Merger: Down to the last 
merge-pass, with 52 segments left of total size: 44450255 bytes
{code}


> Refactor reduce shuffle code
> ----------------------------
>
>                 Key: HADOOP-5223
>                 URL: https://issues.apache.org/jira/browse/HADOOP-5223
>             Project: Hadoop Core
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Owen O'Malley
>            Assignee: Owen O'Malley
>             Fix For: 0.21.0
>
>         Attachments: HADOOP-5233_api.patch, HADOOP-5233_part0.patch
>
>
> The reduce shuffle code has become very complex and entangled. I think we 
> should move it out of ReduceTask and into a separate package 
> (org.apache.hadoop.mapred.task.reduce). Details to follow.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to