[ 
https://issues.apache.org/jira/browse/HADOOP-2095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12596300#action_12596300
 ] 

Arun C Murthy commented on HADOOP-2095:
---------------------------------------

bq. To clarify some of the thinking above... The short term goal is not to find 
the optimal solution. It is to get something done that is clean, understandable 
and works acceptably well in all cases. We can refine from there.

+1

Along a similar tangent, here is an even simpler proposal for a first-cut. 
Please bear in mind that these are a result of the fact that we have noticed 
that the merge code is actually a juicier target to fix; on large jobs we have 
noticed that reduces (which started _after_ all maps were completed) were 
spending way more time in merge rather than in shuffle: 13mins in shuffle, 
17mins in merge and 15mins in reduce. So, the idea to fix the OOM to ensure 
better reliability in a reasonably straight-forward, simple manner and then go 
after merge in HADOOP-3366.

Here goes:
1. Use a compressed stream for map-outputs, not {record|block}-compressed 
sequence-files. Ensure both compressed and decompressed sizes are available for 
the reduce _before_ it actually shuffles the bytes.
2. Decompress map-outputs as soon as they are shuffled into ramfs if there is 
enough space, else suspend shuffle as described above by Eric. This ensures 
that we never hit the #codecs limit, we just need as many codecs as the no. of 
threads doing the shuffle. The idea is that the merge-factor is anyway going to 
be limited by the #codecs, we might as well burn-up RAM. We could try and store 
the compressed outputs as a further refinement in a separate issue.
3. If RAM is more than (say) 50% full, we start merging in-memory. Also, 
initially we should use up as much RAM as possible, allowing for some slack. 
Therefore I propose we do away with the *fs.inmemory.size.mb* config knob and 
use 3/4 or 2/3 of the heap-size available as the RAM limit.
4. If the split is greater than 10% or 25% of available RAM limit, and there is 
on RAM available we shuffle directly to disk (compressed).
5. The output of merge is compressed and written to disk, which potentially 
could be merged along with (4) above.

Hopefully this is reasonably simple and coherent. I'll put up more thoughts on 
HADOOP-3366 about merge improvements, current pitfalls etc.

Thoughts?

> Reducer failed due to Out ofMemory
> ----------------------------------
>
>                 Key: HADOOP-2095
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2095
>             Project: Hadoop Core
>          Issue Type: Bug
>          Components: mapred
>    Affects Versions: 0.15.0
>            Reporter: Runping Qi
>            Assignee: Arun C Murthy
>         Attachments: HADOOP-2095_CompressedBytesWithCodecPool.patch, 
> HADOOP-2095_debug.patch
>
>
> One of the reducers of my job failed with the following exceptions.
> The failure caused the whole job fail eventually.
> Java heapsize was 768MB and sort.io.mb was 140.
> 2007-10-23 19:24:06,100 WARN org.apache.hadoop.mapred.ReduceTask: 
> task_200710231912_0001_r_000020_2 Intermediate Merge of the inmemory files 
> threw an exception: java.lang.OutOfMemoryError: Java heap space
>       at 
> org.apache.hadoop.io.compress.DecompressorStream.(DecompressorStream.java:43)
>       at 
> org.apache.hadoop.io.compress.DefaultCodec.createInputStream(DefaultCodec.java:71)
>       at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1345)
>       at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1231)
>       at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1154)
>       at 
> org.apache.hadoop.io.SequenceFile$Sorter$SegmentDescriptor.nextRawKey(SequenceFile.java:2726)
>       at 
> org.apache.hadoop.io.SequenceFile$Sorter$MergeQueue.merge(SequenceFile.java:2543)
>       at 
> org.apache.hadoop.io.SequenceFile$Sorter.merge(SequenceFile.java:2297)
>       at 
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:1311)
> 2007-10-23 19:24:06,102 INFO org.apache.hadoop.mapred.ReduceTask: 
> task_200710231912_0001_r_000020_2 done copying 
> task_200710231912_0001_m_001428_0 output .
> 2007-10-23 19:24:06,185 INFO org.apache.hadoop.fs.FileSystem: Initialized 
> InMemoryFileSystem: 
> ramfs://mapoutput31952838/task_200710231912_0001_r_000020_2/map_1423.out-0 of 
> size (in bytes): 209715200
> 2007-10-23 19:24:06,193 ERROR org.apache.hadoop.mapred.ReduceTask: Map output 
> copy failure: java.lang.NullPointerException
>       at 
> org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
>       at 
> org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
>       at 
> org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
>       at 
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
>       at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
>       at 
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
>       at 
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)
> 2007-10-23 19:24:06,193 INFO org.apache.hadoop.mapred.ReduceTask: 
> task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001215_0 
> output from xxx
> 2007-10-23 19:24:06,188 INFO org.apache.hadoop.mapred.ReduceTask: 
> task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001211_0 
> output from xxx
> 2007-10-23 19:24:06,185 ERROR org.apache.hadoop.mapred.ReduceTask: Map output 
> copy failure: java.lang.NullPointerException
>       at 
> org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryOutputStream.close(InMemoryFileSystem.java:161)
>       at 
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:49)
>       at 
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:64)
>       at 
> org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.close(ChecksumFileSystem.java:312)
>       at 
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:49)
>       at 
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:64)
>       at 
> org.apache.hadoop.mapred.MapOutputLocation.getFile(MapOutputLocation.java:253)
>       at 
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:713)
>       at 
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)
> 2007-10-23 19:24:06,199 INFO org.apache.hadoop.mapred.ReduceTask: 
> task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001247_0 
> output from .
> 2007-10-23 19:24:06,200 ERROR org.apache.hadoop.mapred.ReduceTask: Map output 
> copy failure: java.lang.NullPointerException
>       at 
> org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
>       at 
> org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
>       at 
> org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
>       at 
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
>       at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
>       at 
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
>       at 
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)
> 2007-10-23 19:24:06,204 INFO org.apache.hadoop.mapred.ReduceTask: 
> task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001422_0 
> output from .
> 2007-10-23 19:24:06,207 ERROR org.apache.hadoop.mapred.ReduceTask: Map output 
> copy failure: java.lang.NullPointerException
>       at 
> org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
>       at 
> org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
>       at 
> org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
>       at 
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
>       at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
>       at 
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
>       at 
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)
> 2007-10-23 19:24:06,209 INFO org.apache.hadoop.mapred.ReduceTask: 
> task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001278_0 
> output from .
> 2007-10-23 19:24:06,198 WARN org.apache.hadoop.mapred.TaskTracker: Error 
> running child
> java.io.IOException: task_200710231912_0001_r_000020_2The reduce copier failed
>       at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:253)
>       at 
> org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:1760)
> 2007-10-23 19:24:06,198 ERROR org.apache.hadoop.mapred.ReduceTask: Map output 
> copy failure: java.lang.NullPointerException
>       at 
> org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
>       at 
> org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
>       at 
> org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
>       at 
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
>       at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
>       at 
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
>       at 
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)
> 2007-10-23 19:24:06,231 INFO org.apache.hadoop.mapred.ReduceTask: 
> task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001531_0 
> output from .
> 2007-10-23 19:24:06,197 ERROR org.apache.hadoop.mapred.ReduceTask: Map output 
> copy failure: java.lang.NullPointerException
>       at 
> org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
>       at 
> org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
>       at 
> org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
>       at 
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
>       at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
>       at 
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
>       at 
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)
> 2007-10-23 19:24:06,237 INFO org.apache.hadoop.mapred.ReduceTask: 
> task_200710231912_0001_r_000020_2 Copying task_200710231912_0001_m_001227_0 
> output from .
> 2007-10-23 19:24:06,196 ERROR org.apache.hadoop.mapred.ReduceTask: Map output 
> copy failure: java.lang.NullPointerException
>       at 
> org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$FileAttributes.access$300(InMemoryFileSystem.java:366)
>       at 
> org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem$InMemoryFileStatus.(InMemoryFileSystem.java:378)
>       at 
> org.apache.hadoop.fs.InMemoryFileSystem$RawInMemoryFileSystem.getFileStatus(InMemoryFileSystem.java:283)
>       at 
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:251)
>       at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:449)
>       at 
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:738)
>       at 
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:665)

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to