[ 
https://issues.apache.org/jira/browse/HADOOP-1698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12519492
 ] 

Tahir Hashmi commented on HADOOP-1698:
--------------------------------------

Minor nit with the latest patch. SequenceFile.java still has the following 
comment, which no longer applies:
   // use hash of uid + host

> 7500+ reducers/partitions causes job to hang
> --------------------------------------------
>
>                 Key: HADOOP-1698
>                 URL: https://issues.apache.org/jira/browse/HADOOP-1698
>             Project: Hadoop
>          Issue Type: Bug
>          Components: examples
>    Affects Versions: 0.13.1
>         Environment: Standard hadoop installation, any number of nodes > 10
>            Reporter: Srikanth Kakani
>            Assignee: Devaraj Das
>            Priority: Blocker
>             Fix For: 0.14.0
>
>         Attachments: 1698.patch, 1698.patch, 1698.patch
>
>
> Steps to Reproduce:
> On the above cluster run any job with #partitions/reducers = 8000+
> Observe CPU utilization on any mapper.
> Observations:
> The output.collect(Key, Value) call takes a huge amount of CPU, causing the 
> job to hang.
> This is a result of two issues:
> 1) Number of partitions beyond 7500 results in a call to sortAndSpillToDisk() 
> on each call to output.collect
> 2) Call to sortAndSpillToDisk causes creation of a writer object, eventually 
> calling:
>  MessageDigest digester = MessageDigest.getInstance("MD5");
>         digester.update((new 
> UID()+"@"+InetAddress.getLocalHost()).getBytes());
>         sync = digester.digest();
> A code-block in  SequenceFile.java(652)
> Issue #1:
> Further investigation reveals the following stack trace whenever the task is 
> suspended.
>   [1] java.net.Inet4AddressImpl.lookupAllHostAddr (native method)
>   [2] java.net.InetAddress$1.lookupAllHostAddr (InetAddress.java:849)
>   [3] java.net.InetAddress.getAddressFromNameService (InetAddress.java:1,183)
>   [4] java.net.InetAddress.getLocalHost (InetAddress.java:1,312)
>   [5] org.apache.hadoop.io.SequenceFile$Writer.<init> (SequenceFile.java:653)
>   [6] org.apache.hadoop.io.SequenceFile$Writer.<init> (SequenceFile.java:622)
>   [7] org.apache.hadoop.io.SequenceFile.createWriter (SequenceFile.java:386)
>   [8] org.apache.hadoop.io.SequenceFile.createWriter (SequenceFile.java:412)
>   [9] org.apache.hadoop.mapred.MapTask$MapOutputBuffer.startPartition 
> (MapTask.java:307)
>   [10] org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpillToDisk 
> (MapTask.java:387)
>   [11] org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect 
> (MapTask.java:355)
> /*My code*/
>   [12] mypackage.MyClass$Map.map (MyClass.java:283)
> --------------
>   [13] org.apache.hadoop.mapred.MapRunner.run (MapRunner.java:46)
>   [14] org.apache.hadoop.mapred.MapTask.run (MapTask.java:189)
>   [15] org.apache.hadoop.mapred.TaskTracker$Child.main 
> (TaskTracker.java:1,771)
> The piece of code causing the problem is (MapTask.java:355)
> ----------------------------------------------------------
>         long totalMem = 0;
>         for (int i = 0; i < partitions; i++)
>           totalMem += sortImpl[i].getMemoryUtilized();  <---- == 16K 
> (BasicTypeSorterBase.java(88) (startOffsets.length (below)) * 
> BUFFERED_KEY_VAL_OVERHEAD;
>         if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) { 
> <----------------condition is always true if partitions > 7500
>           sortAndSpillToDisk();
>           keyValBuffer.reset();
>           for (int i = 0; i < partitions; i++) {
>             sortImpl[i].close();
>           }
>         }
> ----------------------------------------------------------
> Looking at the variable values in  
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect (MapTask.java:355)
>  sortImpl[0] = {
>     org.apache.hadoop.mapred.BasicTypeSorterBase.keyValBuffer: instance of 
> org.apache.hadoop.io.DataOutputBuffer(id=1159)
>     org.apache.hadoop.mapred.BasicTypeSorterBase.startOffsets: instance of 
> int[1024] (id=1160) <--1K * 16 (previously explained) == 16K
>     org.apache.hadoop.mapred.BasicTypeSorterBase.keyLengths: instance of 
> int[1024] (id=1161)
>     org.apache.hadoop.mapred.BasicTypeSorterBase.valueLengths: instance of 
> int[1024] (id=1162)
>     org.apache.hadoop.mapred.BasicTypeSorterBase.pointers: instance of 
> int[1024] (id=1163)
>     org.apache.hadoop.mapred.BasicTypeSorterBase.comparator: instance of 
> org.apache.hadoop.io.MD5Hash$Comparator(id=1164)
>     org.apache.hadoop.mapred.BasicTypeSorterBase.count: 0
>     org.apache.hadoop.mapred.BasicTypeSorterBase.BUFFERED_KEY_VAL_OVERHEAD: 16
>     org.apache.hadoop.mapred.BasicTypeSorterBase.reporter: instance of 
> org.apache.hadoop.mapred.Task$2(id=1165)
> }
> Computation
> maxBufferSize == 120M 
> therotical max #of partitions assuming 0 keyValBuffer.getLength() =120M/16K = 
> 7500 partitions
> Issue #2: 
> digester.update((new UID()+"@"+InetAddress.getLocalHost()).getBytes());
>   [1] java.net.Inet4AddressImpl.lookupAllHostAddr (native method)
>   [2] java.net.InetAddress$1.lookupAllHostAddr (InetAddress.java:849)
>   [3] java.net.InetAddress.getAddressFromNameService (InetAddress.java:1,183)
> InetAddress.getLocalHost() call does not cache results, this results in a 
> look up to the host file and DNS(???) bumping up the CPU usage even higher 
> (Observed).
> This is a BLOCKER issue and needs immediate attention. 
> Notes:
> 1) Output.collect should not take hit from framework, separate thread to 
> handle spill buffer?
> 2) InetAddress.getLocalHost result should be cached in a static variable?
> 3) Spilling logic should not involve #of partitions, needs redesign?

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to