Local is executed on a Hadoop node (when no job is running), So same
JRE/hardware.
JRE:
java version "1.6.0_13"
Java(TM) SE Runtime Environment (build 1.6.0_13-b03)
Java HotSpot(TM) 64-Bit Server VM (build 11.3-b02, mixed mode)
JVM arguments for child task:
/usr/java/jdk1.6.0_13/jre/bin/java
-Djava.library.path=/usr/lib/hadoop/lib/native/Linux-amd64-64:/disk1/hadoop/mapred/local/taskTracker/jobcache/job_200907242015_0048/attempt_200907242015_0048_m_000008_0/work
-Xmx486m
-Djava.io.tmpdir=/disk1/hadoop/mapred/local/taskTracker/jobcache/job_200907242015_0048/attempt_200907242015_0048_m_000008_0/work/tmp
-classpath
/etc/hadoop/conf:/usr/java/default/lib/tools.jar:/usr/lib/hadoop:/usr/lib/hadoop/hadoop-0.18.3-10.cloudera.CH0_3-core.jar:/usr/lib/hadoop/lib/commons-cli-2.0-SNAPSHOT.jar:/usr/lib/hadoop/lib/commons-codec-1.3.jar:/usr/lib/hadoop/lib/commons-httpclient-3.0.1.jar:/usr/lib/hadoop/lib/commons-logging-1.0.4.jar:/usr/lib/hadoop/lib/commons-logging-api-1.0.4.jar:/usr/lib/hadoop/lib/commons-net-1.4.1.jar:/usr/lib/hadoop/lib/hadoop-0.18.3-10.cloudera.CH0_3-fairscheduler.jar:/usr/lib/hadoop/lib/hadoop-0.18.3-10.cloudera.CH0_3-scribe-log4j.jar:/usr/lib/hadoop/lib/hsqldb.jar:/usr/lib/hadoop/lib/jets3t-0.6.1.jar:/usr/lib/hadoop/lib/jetty-5.1.4.jar:/usr/lib/hadoop/lib/junit-4.5.jar:/usr/lib/hadoop/lib/kfs-0.1.3.jar:/usr/lib/hadoop/lib/log4j-1.2.15.jar:/usr/lib/hadoop/lib/mysql-connector-java-5.0.8-bin.jar:/usr/lib/hadoop/lib/oro-2.0.8.jar:/usr/lib/hadoop/lib/servlet-api.jar:/usr/lib/hadoop/lib/slf4j-api-1.4.3.jar:/usr/lib/hadoop/lib/slf4j-log4j12-1.4.3.jar:/usr/lib/hadoop/lib/xmlenc-0.52.jar:/usr/lib/hadoop/lib/jetty-ext/commons-el.jar:/usr/lib/hadoop/lib/jetty-ext/jasper-compiler.jar:/usr/lib/hadoop/lib/jetty-ext/jasper-runtime.jar:/usr/lib/hadoop/lib/jetty-ext/jsp-api.jar::/disk1/hadoop/mapred/local/taskTracker/jobcache/job_200907242015_0048/jars/classes:/disk1/hadoop/mapred/local/taskTracker/jobcache/job_200907242015_0048/jars:/disk2/hadoop/mapred/local/taskTracker/archive/master.ph-cloud.detica.us.com/tmp/hadoop-hadoop/mapred/system/job_200907242015_0048/libjars/ph-gpb-1.0.5.jar/ph-gpb-1.0.5.jar:/disk2/hadoop/mapred/local/taskTracker/archive/<master-hostname>/tmp/hadoop-hadoop/mapred/system/job_200907242015_0048/libjars/protobuf-java-2.0.3.jar/protobuf-java-2.0.3.jar:/disk1/hadoop/mapred/local/taskTracker/archive/<master-hostname>/tmp/hadoop-hadoop/mapred/system/job_200907242015_0048/libjars/commons-lang-2.4.jar/commons-lang-2.4.jar:/disk1/hadoop/mapred/local/taskTracker/jobcache/job_200907242015_0048/attempt_200907242015_0048_m_000008_0/work
-Dhadoop.log.dir=/var/log/hadoop -Dhadoop.root.logger=INFO,TLA
-Dhadoop.tasklog.taskid=attempt_200907242015_0048_m_000008_0
-Dhadoop.tasklog.totalLogFileSize=0
org.apache.hadoop.mapred.TaskTracker$Child 127.0.0.1 52698
attempt_200907242015_0048_m_000008_0
child task heap: -Xmx486m
(mapred.child.ulimit is 995328)
Local call has no JVM arguments, just:
java -cp <myjar>.jar com......RecordReaderTest <fileToTest>
Hardware platform:
Linux 2.6.27.24-170.2.68.fc10.x86_64
8GB
Dual Core Xeon E5205
2 x 300GB SAS 10k, no RAID.
(HP ProLiant blade)
Data is not compressed.
JobTracker:
Running: Started around 20, but as the job progressed it slowly
increased to at the end: 432 (when Pending was 0). Running dropped to
0/Status was marked Succeeded about 10 seconds after that. Is this
normal? The total # of Tasks was 1449.
Stack Traces.
Looked at about 20 stack traces from 2 different nodes. Consistently saw:
2 x org.apache.hadoop.dfs.DFSClient$LeaseChecker @ Thread.sleep()
"Comm thread for attempt_200907242015_0050_m_001409_0" @ Thread.sleep()
"IPC Client (47) connection to <master-hostname>/192.168.1.100:8020
from wkinney" @ Object.wait()
"IPC Client (47) connection to /127.0.0.1:49202 from an unknown user"
@ Object.wait()
VM, GC, Signal Dispatcher, Low Memory Detector, CompilerThread,
Finalizer, Reference Handler...
Then would sometimes see FastDateFormat thread, parseFrom(), or
somewhere near there (e.g. MapRunner.run())
Finally, I consistently saw this:
"Thread-5" daemon prio=10 tid=0x0000000040bbfc00 nid=0x2f87 in
Object.wait() [0x00007fb7498ce000..0x00007fb7498cebf0]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00007fb769fdec00> (a java.util.LinkedList)
at
org.apache.hadoop.dfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:1905)
- locked <0x00007fb769fdec00> (a java.util.LinkedList)
I'm guessing this is normal DataNode activity...
Will
On Thu, Jul 30, 2009 at 1:31 AM, Scott Carey<[email protected]> wrote:
> What is the JRE for the Hadoop nodes versus local? What are the JVM
> arguments for the child tasks and the local version (and heap size)? What is
> the hardware and platform details for the nodes versus the local test?
> Is the data compressed in Hadoop (check the config)?
>
> You mention the TaskTracker web inerface during a job, but what about the
> JobTracker interface? This should show the global view of currently
> scheduled maps versus total slots.
>
> Lastly, check out some more stack traces on the tasks. If they are all still
> in the DateFormat stuff? Surely some of them should be in your parseFrom()
> method too?
>
>
> On 7/29/09 9:07 PM, "william kinney" <[email protected]> wrote:
>
> OK:
> implemented some iotop/iostat monitoring in ganglia. Looks pretty
> standard (job was 23:00 to 23:06):
> - Single Node Disk Read: http://imagebin.org/57716
> - Single Node Disk Write: http://imagebin.org/57717
>
> On each node, noticed that the two TaskTracker$Child processes were
> consuming close to 90% of each core. The TaskTracker and DataNode were
> close to 0%. For the TT children, I did jstack dumps, but didn't
> really see much that popped out other than a lot of time spent in a
> SimpleDateFormat section and the protobuf parse. I switched the SDF
> out with commons.lang FastDateFormat, which reduced the total time for
> both the Hadoop job and the local/non-hadoop test, so still a
> discrepancy between local and hadoop runs.
>
> "You can look at the logs for an individual task, and see how much data it
> read, and how long it took. It might be hitting your 50MB/sec or close in a
> burst, or perhaps not."
> - I decided to log the performance of each RecordReader use within
> hadoop, which is essentially 1:1 for TaskTracker$Child process since I
> have 1 InputSplit per file (ie, no splitting), right?. Saw:
> Example 1) 527639090 bytes in : 18050 ms. (27.8778 MB/s)
> Example 2) 533770314 bytes in : 23494 ms. (21.6669 MB/s)
> Example 3) 529711886 bytes in : 20092 ms. (25.1429 MB/s)
> ...etc
> For reference, the non-hadoop/local test:
> 530710906 bytes in : 9133 ms. (55.41721 MB/s)
>
> Regarding the JobTracker only doing 1 task / node / 2 seconds, that
> will definitely hurt. Although the above discrepancy takes priority
> for me, for now.
>
> "What does the web interface tell you about the number of concurrent map
> tasks during the run? Does it approach the max task slots?"
> - Yeah it definitely does, from the TaskTracker page on each node,
> I'm seeing almost always 2 "RUNNING" tasks (and an accumulating list
> of "COMMIT_PENDING" tasks under Non-Running, which slowly grows as the
> job progresses). Normal?
>
> Also, I used a profiler to profile a local/non-hadoop test of the
> RecordReader/Map():
> class: %Time
> org.apache.commons.lang.time.FastDateFormat.format(long): 46%
> com......parseFrom(byte[]): 42%
> java.io.FileInputStream.read(byte[], int, int): 5%
> ...rest are 1%'ish
> I guess this doesn't show anything helpful. I'll try to attach it to
> hadoop remotely...anyone have any experience doing this w/ YourKit
> Java Profiler?
>
> Anyways, decided to test the "large files" vs "small files" theory again:
> Small files (1449 files, ranging 10-100MB. average: 32 MB)
> - HDFS bytes read 49,057,491,374
> - Map input records 737,850,142
> - Finished in: 7mins, 26sec
> ... 104.898 MB/s
> Large files (22 files, around 500MB. average 514MB)
> - HDFS bytes read 11,852,421,152
> - Map input records 179,657,432
> - Finished in: 1mins, 8sec
> ... 166.225 MB/s
>
> Not sure why before the large files were taking longer, perhaps the
> SimpleDateFormat>FastDateFormat change? Anyways, good to see where I
> need to take the file sizes too...but still 166 MB is not the rate I
> was hoping for (given the # of nodes and local performance).
>
> So I guess in summary, hadoop TaskTracker$Child processes that are
> doing the Map() and RecordReader are about 50% slower than the normal,
> local non-hadoop version. In addition, their rate (~25MB/s) * Num
> Nodes (10) suggests ~ 250MB/s total job performance, but I'm only
> seeing ~166MB/s.
>
> Will
>
> On Tue, Jul 28, 2009 at 6:35 PM, Scott Carey<[email protected]> wrote:
>> See below:
>>
>>
>> On 7/28/09 12:15 PM, "william kinney" <[email protected]> wrote:
>>
>>> Sorry, forgot to include that detail.
>>>
>>> Some data from ganglia:
>>>
>>> CPU:
>>> - on all 10 nodes, I am seeing for the life of the job 85-95% CPU
>>> usage, with about 10% of that being "System" CPU, vs "User".
>>> - Single node graph: http://imagebin.org/57520
>>> - Cluster graph: http://imagebin.org/57523
>>
>> Ok, CPU is definitely loaded. Identify which processes are primarily
>> responsible (Tasks? Datanode? Tasktracker?) You'll want to make the
>> processes eating CPU during a run spit out some stack traces to 'profile'
>> the activity. Use either the 'jstack' utility with the JDK, or do a 'kill
>> -3 <pid>' on a java process to spit out the stack trace to stdout. You'll
>> want to do this a handful of times on a single job if possible to identify
>> any trends.
>>
>>>
>>> Memory:
>>> - Memory used before job is about 0.4GB, During job it fluctuates
>>> up to 0.6GB and 0.7GB, then back down to 0.4GB. Most of the node
>>> memory (8GB) is showing as "Cached".
>>> - Single node graph: http://imagebin.org/57522
>>
>> So the OS is mostly just caching disk files in RAM.
>>
>>>
>>> Network:
>>> - IN and OUT: Each node 6-12MB/s, cumulative about 30-44MB/s.
>>> - Single node graph: http://imagebin.org/57521
>>> - Cluster graph: http://imagebin.org/57525
>>>
>>
>> That is a not insignificant, but cumulative across the cluster its not much.
>>
>>> iostat (disk) (sampled most of the nodes, below values are ranges I saw):
>>> tps: 0.41-1.27
>>> Blk_read/s: 46-58
>>> Blk_wrtn/s: 20-23
>>> (have two disks per node, both SAS, 10k RPM)
>>>
>>
>> Did you do iostat with a parameter to have it spit out more than one row?
>> By default, it spits out data averaged since boot time, like vmstat.
>> My favorite iostat params for monitoring are:
>> iostat -mx 5
>> iostat -dmx 5
>> (or 10 or 15 or 60 second intervals depending on what I'm doing) Ganglia
>> might have some I/O info -- you want both iops and some sort of bytes/sec
>> measurement.
>>
>>> ---
>>> Are those Blk_read/wrtn/s as in block size (4096?) = bytes/second?
>>>
>>
>> I think its the 512 byte block notion, but I always use -m to put it in
>> useful units.
>>
>>> Also, from the job page (different job, same Map method, just more
>>> data...~40GB. 781 files):
>>> Map input records 629,738,080
>>> Map input bytes 41,538,992,880
>>>
>>> Anything else I can look into?
>>
>> Based on your other email:
>>
>> There are almost 800 map tasks, these seem to mostly be data local. The
>> current implementation of the JobTracker schedules rather slowly, and can at
>> best place one new task per node per 2 seconds or so on a small cluster.
>> So, with 10 servers, it will take at least 80 seconds just to schedule all
>> the tasks.
>> If each server can run 8 tasks concurrently, then if the average task
>> doesn't take somewhat longer than 16 seconds, the system will not reach full
>> utilization.
>>
>> What does the web interface tell you about the number of concurrent map
>> tasks during the run? Does it approach the max task slots?
>>
>> You can look at the logs for an individual task, and see how much data it
>> read, and how long it took. It might be hitting your 50MB/sec or close in a
>> burst, or perhaps not.
>>
>> Given the sort of bottlenecks I often see, I suspect the scheduling. But,
>> you have almost maxed CPU use, so its probably not that. Getting stack
>> dumps to see what the processor is doing during your test will help narrow
>> it down.
>>
>>
>>>
>>> Do my original numbers (only 2x performance) jump out at you as being
>>> way off? Or it is common to see that a setup similar to mine?
>>>
>>> I should also note that given its a custom binary format, I do not
>>> support Splitting (isSplittable() is false). I don't think that would
>>> count for such a large discrepancy in expected performance, would it?
>>>
>>
>> If the files are all larger than the block size, it would cause a lot more
>> network activity -- but unless your switch or network is broken or not
>> gigabit -- there is a lot of capacity left in the network.
>>
>>> Thanks for the help,
>>> Will
>>>
>>>
>>> On Tue, Jul 28, 2009 at 12:58 PM, Scott Carey<[email protected]>
>>> wrote:
>>>> Well, the first thing to do in any performance bottleneck investigation is
>>>> to look at the machine hardware resource usage.
>>>>
>>>> During your test, what is the CPU use and disk usage? What about network
>>>> utilization?
>>>> Top, vmstat, iostat, and some network usage monitoring would be useful. It
>>>> could be many things causing your lack of scalability, but without actually
>>>> monitoring your machines to see if there is an obvious bottleneck its just
>>>> random guessing and hunches.
>>>>
>>>>
>>>>
>>>> On 7/28/09 8:18 AM, "william kinney" <[email protected]> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Thanks in advance for the help!
>>>>>
>>>>> I have a performance question relating to how fast I can expect Hadoop
>>>>> to scale. Running Cloudera's 0.18.3-10.
>>>>>
>>>>> I have custom binary format, which is just Google Protocol Buffer
>>>>> (protobuf) serialized data:
>>>>>
>>>>> 669 files, ~30GB total size (ranging 10MB to 100MB each).
>>>>> 128MB block size.
>>>>> 10 Hadoop Nodes.
>>>>>
>>>>> I tested my InputFormat and RecordReader for my input format, and it
>>>>> showed about 56MB/s performance (single thread, no hadoop, passed in
>>>>> test file via FileInputFormat instead of FSDataInputStream) on
>>>>> hardware similar to what I have in my cluster.
>>>>> I also then tested some simple Map logic along w/ the above, and got
>>>>> around 54MB/s. I believe that difference can be accounted for parsing
>>>>> the protobuf data into java objects.
>>>>>
>>>>> Anyways, when I put this logic into a job that has
>>>>> - no reduce (.setNumReduceTasks(0);)
>>>>> - no emit
>>>>> - just protobuf parsing calls (like above)
>>>>>
>>>>> I get a finish time of 10mins, 25sec, which is about 106.24 MB/s.
>>>>>
>>>>> So my question, why is the rate only 2x what I see on a single thread,
>>>>> non-hadoop test? Would it not be:
>>>>> 54MB/s x 10 (Num Nodes) - small hadoop overhead ?
>>>>>
>>>>> Is there any area of my configuration I should look into for tuning?
>>>>>
>>>>> Anyway I could get more accurate performance monitoring of my job?
>>>>>
>>>>> On a side note, I tried the same job after combining the files into
>>>>> about 11 files (still 30GB in size), and actually saw a decrease in
>>>>> performance (~90MB/s).
>>>>>
>>>>> Any help is appreciated. Thanks!
>>>>>
>>>>> Will
>>>>>
>>>>> some hadoop-site.xml values:
>>>>> dfs.replication 3
>>>>> io.file.buffer.size 65536
>>>>> dfs.datanode.handler.count 3
>>>>> mapred.tasktracker.map.tasks.maximum 6
>>>>> dfs.namenode.handler.count 5
>>>>>
>>>>
>>>>
>>>
>>
>>
>
>