[
https://issues.apache.org/jira/browse/HADOOP-1698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12519119
]
Tahir Hashmi commented on HADOOP-1698:
--------------------------------------
Is there any proof of the fact that generating a 128 bit random number makes
the sync marker any less prone to collisions? I don't think so. The probability
of occurrence of the marker is only proportional to the amount of data in the
file and is not influenced by the actual value of the marker.
> 7500+ reducers/partitions causes job to hang
> --------------------------------------------
>
> Key: HADOOP-1698
> URL: https://issues.apache.org/jira/browse/HADOOP-1698
> Project: Hadoop
> Issue Type: Bug
> Components: examples
> Affects Versions: 0.13.1
> Environment: Standard hadoop installation, any number of nodes > 10
> Reporter: Srikanth Kakani
> Assignee: Devaraj Das
> Priority: Blocker
> Attachments: 1698.patch, 1698.patch
>
>
> Steps to Reproduce:
> On the above cluster run any job with #partitions/reducers = 8000+
> Observe CPU utilization on any mapper.
> Observations:
> The output.collect(Key, Value) call takes a huge amount of CPU, causing the
> job to hang.
> This is a result of two issues:
> 1) Number of partitions beyond 7500 results in a call to sortAndSpillToDisk()
> on each call to output.collect
> 2) Call to sortAndSpillToDisk causes creation of a writer object, eventually
> calling:
> MessageDigest digester = MessageDigest.getInstance("MD5");
> digester.update((new
> UID()+"@"+InetAddress.getLocalHost()).getBytes());
> sync = digester.digest();
> A code-block in SequenceFile.java(652)
> Issue #1:
> Further investigation reveals the following stack trace whenever the task is
> suspended.
> [1] java.net.Inet4AddressImpl.lookupAllHostAddr (native method)
> [2] java.net.InetAddress$1.lookupAllHostAddr (InetAddress.java:849)
> [3] java.net.InetAddress.getAddressFromNameService (InetAddress.java:1,183)
> [4] java.net.InetAddress.getLocalHost (InetAddress.java:1,312)
> [5] org.apache.hadoop.io.SequenceFile$Writer.<init> (SequenceFile.java:653)
> [6] org.apache.hadoop.io.SequenceFile$Writer.<init> (SequenceFile.java:622)
> [7] org.apache.hadoop.io.SequenceFile.createWriter (SequenceFile.java:386)
> [8] org.apache.hadoop.io.SequenceFile.createWriter (SequenceFile.java:412)
> [9] org.apache.hadoop.mapred.MapTask$MapOutputBuffer.startPartition
> (MapTask.java:307)
> [10] org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpillToDisk
> (MapTask.java:387)
> [11] org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect
> (MapTask.java:355)
> /*My code*/
> [12] mypackage.MyClass$Map.map (MyClass.java:283)
> --------------
> [13] org.apache.hadoop.mapred.MapRunner.run (MapRunner.java:46)
> [14] org.apache.hadoop.mapred.MapTask.run (MapTask.java:189)
> [15] org.apache.hadoop.mapred.TaskTracker$Child.main
> (TaskTracker.java:1,771)
> The piece of code causing the problem is (MapTask.java:355)
> ----------------------------------------------------------
> long totalMem = 0;
> for (int i = 0; i < partitions; i++)
> totalMem += sortImpl[i].getMemoryUtilized(); <---- == 16K
> (BasicTypeSorterBase.java(88) (startOffsets.length (below)) *
> BUFFERED_KEY_VAL_OVERHEAD;
> if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) {
> <----------------condition is always true if partitions > 7500
> sortAndSpillToDisk();
> keyValBuffer.reset();
> for (int i = 0; i < partitions; i++) {
> sortImpl[i].close();
> }
> }
> ----------------------------------------------------------
> Looking at the variable values in
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect (MapTask.java:355)
> sortImpl[0] = {
> org.apache.hadoop.mapred.BasicTypeSorterBase.keyValBuffer: instance of
> org.apache.hadoop.io.DataOutputBuffer(id=1159)
> org.apache.hadoop.mapred.BasicTypeSorterBase.startOffsets: instance of
> int[1024] (id=1160) <--1K * 16 (previously explained) == 16K
> org.apache.hadoop.mapred.BasicTypeSorterBase.keyLengths: instance of
> int[1024] (id=1161)
> org.apache.hadoop.mapred.BasicTypeSorterBase.valueLengths: instance of
> int[1024] (id=1162)
> org.apache.hadoop.mapred.BasicTypeSorterBase.pointers: instance of
> int[1024] (id=1163)
> org.apache.hadoop.mapred.BasicTypeSorterBase.comparator: instance of
> org.apache.hadoop.io.MD5Hash$Comparator(id=1164)
> org.apache.hadoop.mapred.BasicTypeSorterBase.count: 0
> org.apache.hadoop.mapred.BasicTypeSorterBase.BUFFERED_KEY_VAL_OVERHEAD: 16
> org.apache.hadoop.mapred.BasicTypeSorterBase.reporter: instance of
> org.apache.hadoop.mapred.Task$2(id=1165)
> }
> Computation
> maxBufferSize == 120M
> therotical max #of partitions assuming 0 keyValBuffer.getLength() =120M/16K =
> 7500 partitions
> Issue #2:
> digester.update((new UID()+"@"+InetAddress.getLocalHost()).getBytes());
> [1] java.net.Inet4AddressImpl.lookupAllHostAddr (native method)
> [2] java.net.InetAddress$1.lookupAllHostAddr (InetAddress.java:849)
> [3] java.net.InetAddress.getAddressFromNameService (InetAddress.java:1,183)
> InetAddress.getLocalHost() call does not cache results, this results in a
> look up to the host file and DNS(???) bumping up the CPU usage even higher
> (Observed).
> This is a BLOCKER issue and needs immediate attention.
> Notes:
> 1) Output.collect should not take hit from framework, separate thread to
> handle spill buffer?
> 2) InetAddress.getLocalHost result should be cached in a static variable?
> 3) Spilling logic should not involve #of partitions, needs redesign?
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.