DFS Scalability: high CPU usage in choosing replication targets and file open
-----------------------------------------------------------------------------

                 Key: HADOOP-1073
                 URL: https://issues.apache.org/jira/browse/HADOOP-1073
             Project: Hadoop
          Issue Type: Bug
          Components: dfs
            Reporter: dhruba borthakur


I have a test cluster that has about 1600 data nodes. randomWriter fails to run 
because of map tasks fail with "connection timeout" message. The namenode 
quickly gets to 100% CPU usage. 

The positives first:
1. Datanodes continue to heartbeat and there are no cascading failures.
2. chooseRandom() does not use much CPU and is very lightweight.


An analysis of the namenode shows the following:

1. High CPU usage in FSNamesystem.getPipeline().
2. Moderate CPU usage in FSNamesystem.sortByDistance().

The first one is used by chooseTarget() to sort a list of target-datanodes 
based on their distances from the writer. The second one is used by an open() 
call to arrange the list of datanodes so that the datanode that is closest to 
the reader is first in the list.

I have two proposals to address this problem. Please comment.

Proposal 1: Optimize getDistance()
--------------
In the current implementation, each datanode has a network path associated with 
it. For example "/default-rack/74.6.138.207:50010". The method getDistance() 
splits the network-pathname (using "/") and then does string-compares to 
determine the nearest common ancestor of two given nodes. One optimization 
would be to avoid string splits and comparisions while determining distance 
between two nodes.

Instead, we can maintain the "height" at which a node is located in the network 
topology tree. The root node being at heigth 0. Also, from each InnerNode we 
maintain a direct reference to the parent node. If the two nodes are at the 
same height, send each node to its parent until we reach a common parent.  Thus 
the distance between the two nodes is 2x where x is the distance to the common 
parent.  If the nodes are at different depths to begin with, then repeatedly 
send the node at a greater height to its parent until the nodes are at the same 
height, and then continue as before.

Also, the calls to check checkArgument() from getDistance() may be removed. 
Also, the call to getPipeline() may be done outside the global FSNamesystem 
lock.


Proposal 2: Distribute the workload to the DFSClient
---------------
The namenode downloads the network topology to a dfsclient. The dfsclient 
caches it in memory. When a new block needs to be allocated, the namenode sends 
a list of unsorted datanodes to the client. The client sorts them based on the 
cached network topology map. Similarly, when a file is opened, the namenode 
sends the list of unsorted blocks that comprise this file. The dfsclient sorts 
them and uses them appropriately. The topology map can be compacted into maybe 
a 1Mb buffer for a 10000 node system.

If the network topology is very big, then another option would be to have a set 
of toppology servers (that has a cached copy of the network topology) and the 
dfsclient contacts one of them to sort its list of target datanodes.






-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to