[
https://issues.apache.org/jira/browse/HADOOP-5223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12718153#action_12718153
]
Scott Carey commented on HADOOP-5223:
-------------------------------------
I've had issues with shuffle on 0.19, on a cluster with new hardware capable of
running 13+ maps with 11+ reduces concurrently (dual quad core w/
hyperthreading = 16 hardware threads, 24GB RAM, 4 drives), shuffle is always my
bottleneck on any job where the maps aren't huge or they condense data down
significantly before the reduce. During this bottleneck, disk, network, and
CPU are calm. I collected quite a few trhead dumps in this state on many
different jobs. Increasing parallel copies and tasktracker http threads had no
effect. For the most part, the thread dumps always had the shuffle fetch
threads idle, and the main thread here:
"main" prio=10 tid=0x000000005eed3800 nid=0x62a2 waiting on condition
[0x0000000040eb4000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at
org.apache.hadoop.mapred.ReduceTask$ReduceCopier.getMapCompletionEvents(ReduceTask.java:2244)
at
org.apache.hadoop.mapred.ReduceTask$ReduceCopier.fetchOutputs(ReduceTask.java:1716)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:368)
at org.apache.hadoop.mapred.Child.main(Child.java:158)
There is a one line fix. This JIRA is refacoring the code that this fix would
apply to however, so i'll post my info here to make sure the improvements are
contained in it.
The fix improves times by 40% on a composite set of jobs (a cascading flow
consisting of 25+ map/reduce jobs, with up to 7 concurrent ones).
First, the fix I made is the below:
Comment out or delete the line:
{code} break; //we have a map from this host {code}
in ReduceOutput.java in ReduceCopier.fetchOutputs()
-- line 1804 in 0.19.2-dev, 1911 on 0.20.1-dev and line 1954 currently on
trunk.
Reduces that took 1 to 2 minutes while copying little data in the shuffle phase
on average on my set of jobs now take 1 to 3 seconds on average in the shuffle.
Here is the problem. The shuffle is currently limiting itself to only copy one
shard from a single host per pass, and then sleeping. As long as the number of
map shards is much more than the number of hosts, this requires quite a few
sleep delays. For servers that can handle many map and reduce tasks each, this
gets out of hand quickly, especially on small or medium sized clusters where
the ideal concurrent shuffle copies per reduce is on the order of, or larger
than, the number of hosts.
A more sophisticated fix such as this JIRA will do more, but the low hanging
fruit performance fix is to get every shard that is reported from the last ping
before sleeping again and checking for more. This not only improves the
shuffle speed, but reduces the total number of pings to find out what shards
are available which reduces load elsewhere. It makes little sense to do what
happens now on a small cluster:
Discover say, 100 shards are needed to be fetched, grab 8 of them, then sleep,
ask the again what are available, grab only 8, sleep ...
At the very least, if there are 100 map outputs available to a reducer, it
should keep draining from this list before sleeping and asking for an updated
set.
Some may object to opening more than one concurrent connection to a host on the
basis that it could overload a tasktracker -- but this would seem like a false
assumption to me. First, tasktrackers throttle this with the configuration
parameter for number http threads. Second, reduces throttle this with the
number of concurrent shuffle fetch threads. There is no difference between a
reduce opening 10 concurrent shuffle threads to 10 hosts and 10 to one host,
when all reduces are concurrently doing this and randomly choosing hosts the
average number of concurrent connections on one TT will remain the same.
If it is a serious concern for other reasons (the 'penalty box'? or other error
handling?) then the shuffle queue could be filled in a better order than one
host at a time, or at least not sleep and re-fetch the list without first
draining it. A more significant refactor may do better than the one liner --
but I suspect this alone is most of the performance gain.
Here is a sample log before and after the change on 0.19 with a small dev
cluster with newer hardware -- a particularly bad case for this:
3 TT's, each configured for 13 concurrent maps, 11 concurrent reduces, 10
concurrent shuffle copies, 40 TT http threads:
Before: {code}
2009-06-09 22:13:53,657 INFO org.apache.hadoop.mapred.ReduceTask:
attempt_200906082206_0006_r_000004_0 Need another 51 map output(s) where 0 is
already in progress
2009-06-09 22:13:53,672 INFO org.apache.hadoop.mapred.ReduceTask: Ignoring
obsolete output of KILLED map-task: 'attempt_200906082206_0006_m_000050_0'
2009-06-09 22:13:53,672 INFO org.apache.hadoop.mapred.ReduceTask:
attempt_200906082206_0006_r_000004_0: Got 51 new map-outputs
2009-06-09 22:13:53,672 INFO org.apache.hadoop.mapred.ReduceTask:
attempt_200906082206_0006_r_000004_0: Got 1 obsolete map-outputs from
tasktracker
2009-06-09 22:13:53,672 INFO org.apache.hadoop.mapred.ReduceTask:
attempt_200906082206_0006_r_000004_0 Scheduled 3 outputs (0 slow hosts and0 dup
hosts)
2009-06-09 22:13:53,689 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling
54394 bytes (54398 raw bytes) into RAM from attempt_200906082206_0006_m_000014_0
2009-06-09 22:13:53,690 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling
70736 bytes (70740 raw bytes) into RAM from attempt_200906082206_0006_m_000003_0
2009-06-09 22:13:53,690 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling
73540 bytes (73544 raw bytes) into RAM from attempt_200906082206_0006_m_000001_0
2009-06-09 22:13:53,692 INFO org.apache.hadoop.mapred.ReduceTask: Read 54394
bytes from map-output for attempt_200906082206_0006_m_000014_0
2009-06-09 22:13:53,692 INFO org.apache.hadoop.mapred.ReduceTask: Read 70736
bytes from map-output for attempt_200906082206_0006_m_000003_0
2009-06-09 22:13:53,692 INFO org.apache.hadoop.mapred.ReduceTask: Read 73540
bytes from map-output for attempt_200906082206_0006_m_000001_0
2009-06-09 22:13:53,692 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from
attempt_200906082206_0006_m_000014_0 -> (21, 205) from 10.3.0.142
2009-06-09 22:13:53,692 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from
attempt_200906082206_0006_m_000003_0 -> (21, 240) from 10.3.0.143
2009-06-09 22:13:53,693 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from
attempt_200906082206_0006_m_000001_0 -> (21, 204) from 10.3.0.141
2009-06-09 22:13:55,662 INFO org.apache.hadoop.mapred.ReduceTask:
attempt_200906082206_0006_r_000004_0 Scheduled 3 outputs (0 slow hosts and0 dup
hosts)
-- SNIP --
2009-06-09 22:14:49,753 INFO org.apache.hadoop.mapred.ReduceTask: Read 79913
bytes from map-output for attempt_200906082206_0006_m_000042_0
2009-06-09 22:14:49,753 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from
attempt_200906082206_0006_m_000042_0 -> (21, 237) from 10.3.0.141
2009-06-09 22:14:50,751 INFO org.apache.hadoop.mapred.ReduceTask: Closed ram
manager
2009-06-09 22:14:50,751 INFO org.apache.hadoop.mapred.ReduceTask: Interleaved
on-disk merge complete: 0 files left.
2009-06-09 22:14:50,752 INFO org.apache.hadoop.mapred.ReduceTask: In-memory
merge complete: 51 files left.
2009-06-09 22:14:50,813 INFO org.apache.hadoop.mapred.Merger: Merging 51 sorted
segments
2009-06-09 22:14:50,817 INFO org.apache.hadoop.mapred.Merger: Down to the last
merge-pass, with 51 segments left of total size: 3325252 bytes
{code}
After -- (slightly different job): {code}
2009-06-08 23:51:07,057 INFO org.apache.hadoop.mapred.ReduceTask:
attempt_200906082336_0014_r_000009_0 Thread waiting: Thread for merging on-disk
files
2009-06-08 23:51:07,058 INFO org.apache.hadoop.mapred.ReduceTask:
attempt_200906082336_0014_r_000009_0 Need another 68 map output(s) where 0 is
already in progress
2009-06-08 23:51:07,069 INFO org.apache.hadoop.mapred.ReduceTask: Ignoring
obsolete output of KILLED map-task: 'attempt_200906082336_0014_m_000060_1'
2009-06-08 23:51:07,070 INFO org.apache.hadoop.mapred.ReduceTask: Ignoring
obsolete output of KILLED map-task: 'attempt_200906082336_0014_m_000014_0'
2009-06-08 23:51:07,070 INFO org.apache.hadoop.mapred.ReduceTask:
attempt_200906082336_0014_r_000009_0: Got 68 new map-outputs
2009-06-08 23:51:07,070 INFO org.apache.hadoop.mapred.ReduceTask:
attempt_200906082336_0014_r_000009_0: Got 2 obsolete map-outputs from
tasktracker
2009-06-08 23:51:07,071 INFO org.apache.hadoop.mapred.ReduceTask:
attempt_200906082336_0014_r_000009_0 Scheduled 68 outputs (0 slow hosts and0
dup hosts)
2009-06-08 23:51:07,106 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling
674904 bytes (674908 raw bytes) into RAM from
attempt_200906082336_0014_m_000005_0
2009-06-08 23:51:07,110 INFO org.apache.hadoop.mapred.ReduceTask: Read 674904
bytes from map-output for attempt_200906082336_0014_m_000005_0
2009-06-08 23:51:07,110 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from
attempt_200906082336_0014_m_000005_0 -> (61, 26) from 10.3.0.143
-- SNIP --
2009-06-08 23:51:08,389 INFO org.apache.hadoop.mapred.ReduceTask: Read 1439739
bytes from map-output for attempt_200906082336_0014_m_000012_1
2009-06-08 23:51:08,389 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from
attempt_200906082336_0014_m_000012_1 -> (50, 25) from 10.3.0.141
2009-06-08 23:51:09,064 INFO org.apache.hadoop.mapred.ReduceTask: Closed ram
manager
2009-06-08 23:51:09,064 INFO org.apache.hadoop.mapred.ReduceTask: Interleaved
on-disk merge complete: 0 files left.
2009-06-08 23:51:09,064 INFO org.apache.hadoop.mapred.ReduceTask: In-memory
merge complete: 68 files left.
2009-06-08 23:51:09,122 INFO org.apache.hadoop.mapred.Merger: Merging 68 sorted
segments
2009-06-08 23:51:09,126 INFO org.apache.hadoop.mapred.Merger: Down to the last
merge-pass, with 52 segments left of total size: 44450255 bytes
{code}
> Refactor reduce shuffle code
> ----------------------------
>
> Key: HADOOP-5223
> URL: https://issues.apache.org/jira/browse/HADOOP-5223
> Project: Hadoop Core
> Issue Type: Improvement
> Components: mapred
> Reporter: Owen O'Malley
> Assignee: Owen O'Malley
> Fix For: 0.21.0
>
> Attachments: HADOOP-5233_api.patch, HADOOP-5233_part0.patch
>
>
> The reduce shuffle code has become very complex and entangled. I think we
> should move it out of ReduceTask and into a separate package
> (org.apache.hadoop.mapred.task.reduce). Details to follow.
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.