[ https://issues.apache.org/jira/browse/HADOOP-1698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12519492 ]
Tahir Hashmi commented on HADOOP-1698: -------------------------------------- Minor nit with the latest patch. SequenceFile.java still has the following comment, which no longer applies: // use hash of uid + host > 7500+ reducers/partitions causes job to hang > -------------------------------------------- > > Key: HADOOP-1698 > URL: https://issues.apache.org/jira/browse/HADOOP-1698 > Project: Hadoop > Issue Type: Bug > Components: examples > Affects Versions: 0.13.1 > Environment: Standard hadoop installation, any number of nodes > 10 > Reporter: Srikanth Kakani > Assignee: Devaraj Das > Priority: Blocker > Fix For: 0.14.0 > > Attachments: 1698.patch, 1698.patch, 1698.patch > > > Steps to Reproduce: > On the above cluster run any job with #partitions/reducers = 8000+ > Observe CPU utilization on any mapper. > Observations: > The output.collect(Key, Value) call takes a huge amount of CPU, causing the > job to hang. > This is a result of two issues: > 1) Number of partitions beyond 7500 results in a call to sortAndSpillToDisk() > on each call to output.collect > 2) Call to sortAndSpillToDisk causes creation of a writer object, eventually > calling: > MessageDigest digester = MessageDigest.getInstance("MD5"); > digester.update((new > UID()+"@"+InetAddress.getLocalHost()).getBytes()); > sync = digester.digest(); > A code-block in SequenceFile.java(652) > Issue #1: > Further investigation reveals the following stack trace whenever the task is > suspended. > [1] java.net.Inet4AddressImpl.lookupAllHostAddr (native method) > [2] java.net.InetAddress$1.lookupAllHostAddr (InetAddress.java:849) > [3] java.net.InetAddress.getAddressFromNameService (InetAddress.java:1,183) > [4] java.net.InetAddress.getLocalHost (InetAddress.java:1,312) > [5] org.apache.hadoop.io.SequenceFile$Writer.<init> (SequenceFile.java:653) > [6] org.apache.hadoop.io.SequenceFile$Writer.<init> (SequenceFile.java:622) > [7] org.apache.hadoop.io.SequenceFile.createWriter (SequenceFile.java:386) > [8] org.apache.hadoop.io.SequenceFile.createWriter (SequenceFile.java:412) > [9] org.apache.hadoop.mapred.MapTask$MapOutputBuffer.startPartition > (MapTask.java:307) > [10] org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpillToDisk > (MapTask.java:387) > [11] org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect > (MapTask.java:355) > /*My code*/ > [12] mypackage.MyClass$Map.map (MyClass.java:283) > -------------- > [13] org.apache.hadoop.mapred.MapRunner.run (MapRunner.java:46) > [14] org.apache.hadoop.mapred.MapTask.run (MapTask.java:189) > [15] org.apache.hadoop.mapred.TaskTracker$Child.main > (TaskTracker.java:1,771) > The piece of code causing the problem is (MapTask.java:355) > ---------------------------------------------------------- > long totalMem = 0; > for (int i = 0; i < partitions; i++) > totalMem += sortImpl[i].getMemoryUtilized(); <---- == 16K > (BasicTypeSorterBase.java(88) (startOffsets.length (below)) * > BUFFERED_KEY_VAL_OVERHEAD; > if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) { > <----------------condition is always true if partitions > 7500 > sortAndSpillToDisk(); > keyValBuffer.reset(); > for (int i = 0; i < partitions; i++) { > sortImpl[i].close(); > } > } > ---------------------------------------------------------- > Looking at the variable values in > org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect (MapTask.java:355) > sortImpl[0] = { > org.apache.hadoop.mapred.BasicTypeSorterBase.keyValBuffer: instance of > org.apache.hadoop.io.DataOutputBuffer(id=1159) > org.apache.hadoop.mapred.BasicTypeSorterBase.startOffsets: instance of > int[1024] (id=1160) <--1K * 16 (previously explained) == 16K > org.apache.hadoop.mapred.BasicTypeSorterBase.keyLengths: instance of > int[1024] (id=1161) > org.apache.hadoop.mapred.BasicTypeSorterBase.valueLengths: instance of > int[1024] (id=1162) > org.apache.hadoop.mapred.BasicTypeSorterBase.pointers: instance of > int[1024] (id=1163) > org.apache.hadoop.mapred.BasicTypeSorterBase.comparator: instance of > org.apache.hadoop.io.MD5Hash$Comparator(id=1164) > org.apache.hadoop.mapred.BasicTypeSorterBase.count: 0 > org.apache.hadoop.mapred.BasicTypeSorterBase.BUFFERED_KEY_VAL_OVERHEAD: 16 > org.apache.hadoop.mapred.BasicTypeSorterBase.reporter: instance of > org.apache.hadoop.mapred.Task$2(id=1165) > } > Computation > maxBufferSize == 120M > therotical max #of partitions assuming 0 keyValBuffer.getLength() =120M/16K = > 7500 partitions > Issue #2: > digester.update((new UID()+"@"+InetAddress.getLocalHost()).getBytes()); > [1] java.net.Inet4AddressImpl.lookupAllHostAddr (native method) > [2] java.net.InetAddress$1.lookupAllHostAddr (InetAddress.java:849) > [3] java.net.InetAddress.getAddressFromNameService (InetAddress.java:1,183) > InetAddress.getLocalHost() call does not cache results, this results in a > look up to the host file and DNS(???) bumping up the CPU usage even higher > (Observed). > This is a BLOCKER issue and needs immediate attention. > Notes: > 1) Output.collect should not take hit from framework, separate thread to > handle spill buffer? > 2) InetAddress.getLocalHost result should be cached in a static variable? > 3) Spilling logic should not involve #of partitions, needs redesign? -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.