Author: hairong
Date: Mon Aug 4 16:03:46 2008
New Revision: 682536
URL: http://svn.apache.org/viewvc?rev=682536&view=rev
Log:
HADOOP-3620. Namenode should synchronously resolve a datanode's network
location when the datanode registers. Contributed by Hairong Kuang.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/conf/hadoop-default.xml
hadoop/core/trunk/src/core/org/apache/hadoop/net/NetUtils.java
hadoop/core/trunk/src/core/org/apache/hadoop/net/NetworkTopology.java
hadoop/core/trunk/src/core/org/apache/hadoop/net/ScriptBasedMapping.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeCommand.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/NNThroughputBenchmark.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=682536&r1=682535&r2=682536&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Aug 4 16:03:46 2008
@@ -134,6 +134,9 @@
HADOOP-3694. Improve unit test performance by changing
MiniDFSCluster to listen only on 127.0.0.1. (cutting)
+ HADOOP-3620. Namenode should synchronously resolve a datanode's network
+ location when the datanode registers. (hairong)
+
OPTIMIZATIONS
HADOOP-3556. Removed lock contention in MD5Hash by changing the
Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=682536&r1=682535&r2=682536&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Mon Aug 4 16:03:46 2008
@@ -1257,7 +1257,7 @@
<property>
<name>topology.script.number.args</name>
- <value>20</value>
+ <value>100</value>
<description> The max number of args that the script configured with
topology.script.file.name should be run with. Each arg is an
IP address.
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/net/NetUtils.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/net/NetUtils.java?rev=682536&r1=682535&r2=682536&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/net/NetUtils.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/net/NetUtils.java Mon Aug 4
16:03:46 2008
@@ -20,9 +20,11 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
+import java.net.UnknownHostException;
import java.util.Map.Entry;
import java.util.*;
@@ -366,4 +368,41 @@
return (socket.getChannel() == null) ?
socket.getOutputStream() : new SocketOutputStream(socket,
timeout);
}
+
+ /**
+ * Given a string representation of a host, return its ip address
+ * in textual presentation.
+ *
+ * @param name a string representation of a host:
+ * either a textual representation its IP address or its host
name
+ * @return its IP address in the string format
+ */
+ public static String normalizeHostName(String name) {
+ if (Character.digit(name.charAt(0), 16) != -1) { // it is an IP
+ return name;
+ } else {
+ try {
+ InetAddress ipAddress = InetAddress.getByName(name);
+ return ipAddress.getHostAddress();
+ } catch (UnknownHostException e) {
+ return name;
+ }
+ }
+ }
+
+ /**
+ * Given a collection of string representation of hosts, return a list of
+ * corresponding IP addresses in the textual representation.
+ *
+ * @param names a collection of string representations of hosts
+ * @return a list of corresponding IP addresses in the string format
+ * @see #normalizeHostName(String)
+ */
+ public static List<String> normalizeHostNames(Collection<String> names) {
+ List<String> hostNames = new ArrayList<String>(names.size());
+ for (String name : names) {
+ hostNames.add(normalizeHostName(name));
+ }
+ return hostNames;
+ }
}
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/net/NetworkTopology.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/net/NetworkTopology.java?rev=682536&r1=682535&r2=682536&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/net/NetworkTopology.java
(original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/net/NetworkTopology.java Mon
Aug 4 16:03:46 2008
@@ -38,7 +38,6 @@
*/
public class NetworkTopology {
public final static String DEFAULT_RACK = "/default-rack";
- public final static String UNRESOLVED = "";
public final static int DEFAULT_HOST_LEVEL = 2;
public static final Log LOG =
LogFactory.getLog("org.apache.hadoop.net.NetworkTopology");
@@ -344,11 +343,6 @@
*/
public void remove(Node node) {
if (node==null) return;
- if (NetworkTopology.UNRESOLVED.equals(node.getNetworkLocation())) {
- // the node's network location has not resolved yet;
- // so it is not in the network topology
- return;
- }
if( node instanceof InnerNode ) {
throw new IllegalArgumentException(
"Not allow to remove an inner node: "+NodeBase.getPath(node));
Modified:
hadoop/core/trunk/src/core/org/apache/hadoop/net/ScriptBasedMapping.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/net/ScriptBasedMapping.java?rev=682536&r1=682535&r2=682536&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/net/ScriptBasedMapping.java
(original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/net/ScriptBasedMapping.java
Mon Aug 4 16:03:46 2008
@@ -20,7 +20,6 @@
import java.util.*;
import java.io.*;
-import java.net.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -32,66 +31,70 @@
* This class implements the [EMAIL PROTECTED] DNSToSwitchMapping} interface
using a
* script configured via topology.script.file.name .
*/
-public final class ScriptBasedMapping implements Configurable,
-DNSToSwitchMapping
+public final class ScriptBasedMapping extends CachedDNSToSwitchMapping
+implements Configurable
{
+ public ScriptBasedMapping() {
+ super(new RawScriptBasedMapping());
+ }
+
+ public ScriptBasedMapping(Configuration conf) {
+ this();
+ setConf(conf);
+ }
+
+ public Configuration getConf() {
+ return ((RawScriptBasedMapping)rawMapping).getConf();
+ }
+
+ public void setConf(Configuration conf) {
+ ((RawScriptBasedMapping)rawMapping).setConf(conf);
+ }
+
+ private static final class RawScriptBasedMapping
+ implements DNSToSwitchMapping {
private String scriptName;
private Configuration conf;
private int maxArgs; //max hostnames per call of the script
- private Map<String, String> cache = new TreeMap<String, String>();
private static Log LOG =
LogFactory.getLog("org.apache.hadoop.net.ScriptBasedMapping");
public void setConf (Configuration conf) {
this.scriptName = conf.get("topology.script.file.name");
- this.maxArgs = conf.getInt("topology.script.number.args", 20);
+ this.maxArgs = conf.getInt("topology.script.number.args", 100);
this.conf = conf;
}
public Configuration getConf () {
return conf;
}
-
- public ScriptBasedMapping() {}
+
+ public RawScriptBasedMapping() {}
public List<String> resolve(List<String> names) {
List <String> m = new ArrayList<String>(names.size());
+ if (names.isEmpty()) {
+ return m;
+ }
+
if (scriptName == null) {
for (int i = 0; i < names.size(); i++) {
m.add(NetworkTopology.DEFAULT_RACK);
}
return m;
}
- List<String> hosts = new ArrayList<String>(names.size());
- for (String name : names) {
- name = getHostName(name);
- if (cache.get(name) == null) {
- hosts.add(name);
- }
- }
- int i = 0;
- String output = runResolveCommand(hosts);
+ String output = runResolveCommand(names);
if (output != null) {
StringTokenizer allSwitchInfo = new StringTokenizer(output);
while (allSwitchInfo.hasMoreTokens()) {
String switchInfo = allSwitchInfo.nextToken();
- cache.put(hosts.get(i++), switchInfo);
- }
- }
- for (String name : names) {
- //now everything is in the cache
- name = getHostName(name);
- if (cache.get(name) != null) {
- m.add(cache.get(name));
- } else { //resolve all or nothing
- return null;
+ m.add(switchInfo);
}
}
return m;
}
private String runResolveCommand(List<String> args) {
- InetAddress ipaddr = null;
int loopCount = 0;
if (args.size() == 0) {
return null;
@@ -104,12 +107,7 @@
cmdList.add(scriptName);
for (numProcessed = start; numProcessed < (start + maxArgs) &&
numProcessed < args.size(); numProcessed++) {
- try {
- ipaddr = InetAddress.getByName(args.get(numProcessed));
- } catch (UnknownHostException uh) {
- return null;
- }
- cmdList.add(ipaddr.getHostAddress());
+ cmdList.add(args.get(numProcessed));
}
File dir = null;
String userDir;
@@ -129,11 +127,5 @@
}
return allOutput.toString();
}
- private String getHostName(String hostWithPort) {
- int j;
- if ((j = hostWithPort.indexOf(':')) != -1) {
- hostWithPort = hostWithPort.substring(0, j);
- }
- return hostWithPort;
}
}
Modified:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=682536&r1=682535&r2=682536&view=diff
==============================================================================
---
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
(original)
+++
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
Mon Aug 4 16:03:46 2008
@@ -43,7 +43,7 @@
protected long remaining;
protected long lastUpdate;
protected int xceiverCount;
- protected String location = NetworkTopology.UNRESOLVED;
+ protected String location = NetworkTopology.DEFAULT_RACK;
/** HostName as suplied by the datanode during registration as its
* name. Namenode uses datanode IP address as the name.
@@ -146,8 +146,7 @@
long r = getRemaining();
long u = getDfsUsed();
buffer.append("Name: "+name+"\n");
- if (!NetworkTopology.UNRESOLVED.equals(location) &&
- !NetworkTopology.DEFAULT_RACK.equals(location)) {
+ if (!NetworkTopology.DEFAULT_RACK.equals(location)) {
buffer.append("Rack: "+location+"\n");
}
buffer.append("Decommission Status : ");
@@ -173,8 +172,7 @@
long r = getRemaining();
long u = getDfsUsed();
buffer.append(name);
- if (!NetworkTopology.UNRESOLVED.equals(location) &&
- !NetworkTopology.DEFAULT_RACK.equals(location)) {
+ if (!NetworkTopology.DEFAULT_RACK.equals(location)) {
buffer.append(" "+location);
}
if (isDecommissioned()) {
Modified:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=682536&r1=682535&r2=682536&view=diff
==============================================================================
---
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
(original)
+++
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
Mon Aug 4 16:03:46 2008
@@ -136,10 +136,9 @@
ThreadGroup threadGroup = null;
long blockReportInterval;
//disallow the sending of BR before instructed to do so
- long lastBlockReport = Long.MAX_VALUE;
+ long lastBlockReport = 0;
boolean resetBlockReportTime = true;
long initialBlockReportDelay = BLOCKREPORT_INITIAL_DELAY * 1000L;
- private boolean waitForFirstBlockReportRequest = false;
long lastHeartbeat = 0;
long heartBeatInterval;
private DataStorage storage = null;
@@ -533,7 +532,9 @@
+ dnRegistration.getStorageID()
+ ". Expecting " + storage.getStorageID());
}
- waitForFirstBlockReportRequest = true;
+
+ // random short delay - helps scatter the BR from all DNs
+ scheduleBlockReport(initialBlockReportDelay);
}
/**
@@ -834,19 +835,6 @@
// start distributed upgrade here
processDistributedUpgradeCommand((UpgradeCommand)cmd);
break;
- case DatanodeProtocol.DNA_BLOCKREPORT:
- // only send BR when receive request the 1st time
- if (waitForFirstBlockReportRequest) {
- LOG.info("DatanodeCommand action: DNA_BLOCKREPORT - scheduled");
- // dropping all following BR requests
- waitForFirstBlockReportRequest = false;
- // random short delay - helps scatter the BR from all DNs
- scheduleBlockReport(initialBlockReportDelay);
- } else {
- LOG.info("DatanodeCommand action: DNA_BLOCKREPORT" +
- "- ignored becaused one is already scheduled");
- }
- break;
case DatanodeProtocol.DNA_RECOVERBLOCK:
recoverBlocks(bcmd.getBlocks(), bcmd.getTargets());
break;
Modified:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=682536&r1=682535&r2=682536&view=diff
==============================================================================
---
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
(original)
+++
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
Mon Aug 4 16:03:46 2008
@@ -97,8 +97,6 @@
/** A set of blocks to be invalidated by this datanode */
private Set<Block> invalidateBlocks = new TreeSet<Block>();
- boolean processedBlockReport = false;
-
/* Variables for maintaning number of blocks scheduled to be written to
* this datanode. This count is approximate and might be slightly higger
* in case of errors (e.g. datanode does not report if an error occurs
@@ -307,21 +305,6 @@
}
}
- /**
- * Set the bit signifying that the first block report from this datanode has
been
- * processed
- */
- void setBlockReportProcessed(boolean val) {
- processedBlockReport = val;
- }
-
- /**
- * Have we processed any block report from this datanode yet
- */
- boolean getBlockReportProcessed() {
- return processedBlockReport;
- }
-
BlockCommand getReplicationCommand(int maxTransfers) {
List<BlockTargetPair> blocktargetlist = replicateBlocks.poll(maxTransfers);
return blocktargetlist == null? null:
Modified:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=682536&r1=682535&r2=682536&view=diff
==============================================================================
---
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
(original)
+++
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
Mon Aug 4 16:03:46 2008
@@ -34,6 +34,7 @@
import org.apache.hadoop.util.*;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.metrics.util.MBeanUtil;
+import org.apache.hadoop.net.CachedDNSToSwitchMapping;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
@@ -62,7 +63,6 @@
import java.net.InetSocketAddress;
import java.util.*;
import java.util.Map.Entry;
-import java.util.concurrent.LinkedBlockingQueue;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
@@ -221,7 +221,6 @@
public Daemon lmthread = null; // LeaseMonitor thread
Daemon smmthread = null; // SafeModeMonitor thread
public Daemon replthread = null; // Replication thread
- Daemon resthread = null; //ResolutionMonitor thread
volatile boolean fsRunning = true;
long systemStart = 0;
@@ -260,8 +259,6 @@
// datanode networktoplogy
NetworkTopology clusterMap = new NetworkTopology();
private DNSToSwitchMapping dnsToSwitchMapping;
- private LinkedBlockingQueue<DatanodeDescriptor> resolutionQueue =
- new LinkedBlockingQueue <DatanodeDescriptor>();
// for block replicas placement
ReplicationTargetChooser replicator;
@@ -318,11 +315,9 @@
this.hbthread = new Daemon(new HeartbeatMonitor());
this.lmthread = new Daemon(leaseManager.createMonitor());
this.replthread = new Daemon(new ReplicationMonitor());
- this.resthread = new Daemon(new ResolutionMonitor());
hbthread.start();
lmthread.start();
replthread.start();
- resthread.start();
this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),
conf.get("dfs.hosts.exclude",""));
@@ -332,6 +327,15 @@
this.dnsToSwitchMapping = ReflectionUtils.newInstance(
conf.getClass("topology.node.switch.mapping.impl",
ScriptBasedMapping.class,
DNSToSwitchMapping.class), conf);
+
+ /* If the dns to swith mapping supports cache, resolve network
+ * locations of those hosts in the include list,
+ * and store the mapping in the cache; so future calls to resolve
+ * will be fast.
+ */
+ if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
+ dnsToSwitchMapping.resolve(new
ArrayList<String>(hostsReader.getHosts()));
+ }
String infoAddr =
NetUtils.getServerAddress(conf, "dfs.info.bindAddress",
@@ -484,7 +488,6 @@
if (infoServer != null) infoServer.stop();
if (hbthread != null) hbthread.interrupt();
if (replthread != null) replthread.interrupt();
- if (resthread != null) resthread.interrupt();
if (dnthread != null) dnthread.interrupt();
if (smmthread != null) smmthread.interrupt();
} catch (InterruptedException ie) {
@@ -1868,79 +1871,6 @@
return dir.getListing(src);
}
- public void addToResolutionQueue(DatanodeDescriptor d) {
- while (!resolutionQueue.add(d)) {
- LOG.warn("Couldn't add to the Resolution queue now. Will " +
- "try again");
- try {
- Thread.sleep(2000);
- } catch (InterruptedException ie) {}
- }
- }
-
- private class ResolutionMonitor implements Runnable {
- public void run() {
- try {
- while (fsRunning) {
- try {
- List <DatanodeDescriptor> datanodes =
- new ArrayList<DatanodeDescriptor>(resolutionQueue.size());
- // Block if the queue is empty
- datanodes.add(resolutionQueue.take());
- resolutionQueue.drainTo(datanodes);
- List<String> dnHosts = new ArrayList<String>(datanodes.size());
- for (DatanodeDescriptor d : datanodes) {
- dnHosts.add(d.getName());
- }
- List<String> rName = dnsToSwitchMapping.resolve(dnHosts);
- if (rName == null) {
- LOG.error("The resolve call returned null! Using " +
- NetworkTopology.DEFAULT_RACK + " for some hosts");
- rName = new ArrayList<String>(dnHosts.size());
- for (int i = 0; i < dnHosts.size(); i++) {
- rName.add(NetworkTopology.DEFAULT_RACK);
- }
- }
- int i = 0;
- for (String m : rName) {
- DatanodeDescriptor d = datanodes.get(i++);
- d.setNetworkLocation(m);
- clusterMap.add(d);
- }
- } catch (InterruptedException e) {
- FSNamesystem.LOG.debug("ResolutionMonitor thread received " +
- "InterruptException. " + e);
- }
- }
- } catch (Exception e) {
- FSNamesystem.LOG.error(StringUtils.stringifyException(e));
- }
- }
- }
-
- /**
- * Has the block report of the datanode represented by nodeReg been processed
- * yet.
- * @param nodeReg
- * @return true or false
- */
- synchronized boolean blockReportProcessed(DatanodeRegistration nodeReg)
- throws IOException {
- return getDatanode(nodeReg).getBlockReportProcessed();
- }
-
- /**
- * Has the datanode been resolved to a switch/rack
- */
- synchronized boolean isResolved(DatanodeRegistration dnReg) {
- try {
- return !getDatanode(dnReg).getNetworkLocation()
- .equals(NetworkTopology.UNRESOLVED);
- } catch (IOException ie) {
- return false;
- }
- }
-
/////////////////////////////////////////////////////////
//
// These methods are called by datanodes
@@ -1971,11 +1901,6 @@
*/
public synchronized void registerDatanode(DatanodeRegistration nodeReg
) throws IOException {
-
- if (!verifyNodeRegistration(nodeReg)) {
- throw new DisallowedDatanodeException(nodeReg);
- }
-
String dnAddress = Server.getRemoteAddress();
if (dnAddress == null) {
// Mostly called inside an RPC.
@@ -1983,6 +1908,11 @@
dnAddress = nodeReg.getHost();
}
+ // check if the datanode is allowed to be connect to the namenode
+ if (!verifyNodeRegistration(nodeReg, dnAddress)) {
+ throw new DisallowedDatanodeException(nodeReg);
+ }
+
String hostName = nodeReg.getHost();
// update the datanode's name with ip:port
@@ -2038,9 +1968,10 @@
clusterMap.remove(nodeS);
nodeS.updateRegInfo(nodeReg);
nodeS.setHostName(hostName);
- nodeS.setNetworkLocation(NetworkTopology.UNRESOLVED);
- nodeS.setBlockReportProcessed(false);
- addToResolutionQueue(nodeS);
+
+ // resolve network location
+ resolveNetworkLocation(nodeS);
+ clusterMap.add(nodeS);
// also treat the registration message as a heartbeat
synchronized(heartbeats) {
@@ -2065,9 +1996,10 @@
}
// register new datanode
DatanodeDescriptor nodeDescr
- = new DatanodeDescriptor(nodeReg, NetworkTopology.UNRESOLVED, hostName);
+ = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK,
hostName);
+ resolveNetworkLocation(nodeDescr);
unprotectedAddDatanode(nodeDescr);
- addToResolutionQueue(nodeDescr);
+ clusterMap.add(nodeDescr);
// also treat the registration message as a heartbeat
synchronized(heartbeats) {
@@ -2079,6 +2011,33 @@
return;
}
+ /* Resolve a node's network location */
+ private void resolveNetworkLocation (DatanodeDescriptor node) {
+ List<String> names = new ArrayList<String>(1);
+ if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
+ // get the node's IP address
+ names.add(node.getHost());
+ } else {
+ // get the node's host name
+ String hostName = node.getHostName();
+ int colon = hostName.indexOf(":");
+ hostName = (colon==-1)?hostName:hostName.substring(0,colon);
+ names.add(hostName);
+ }
+
+ // resolve its network location
+ List<String> rName = dnsToSwitchMapping.resolve(names);
+ String networkLocation;
+ if (rName == null) {
+ LOG.error("The resolve call returned null! Using " +
+ NetworkTopology.DEFAULT_RACK + " for host " + names);
+ networkLocation = NetworkTopology.DEFAULT_RACK;
+ } else {
+ networkLocation = rName.get(0);
+ }
+ node.setNetworkLocation(networkLocation);
+ }
+
/**
* Get registrationID for datanodes based on the namespaceID.
*
@@ -2171,16 +2130,6 @@
}
}
- // If the datanode has (just) been resolved and we haven't ever processed
- // a block report from it yet, ask for one now.
- if (!blockReportProcessed(nodeReg)) {
- // If we never processed a block report from this datanode, we shouldn't
- // have any work for that as well
- assert(cmd == null);
- if (isResolved(nodeReg)) {
- return DatanodeCommand.BLOCKREPORT;
- }
- }
//check distributed upgrade
if (cmd == null) {
cmd = getDistributedUpgradeCommand();
@@ -2717,13 +2666,6 @@
throw new DisallowedDatanodeException(node);
}
- if (node.getNetworkLocation().equals(NetworkTopology.UNRESOLVED)) {
- LOG.info("Ignoring block report from " + nodeID.getName() +
- " because rack location for this datanode is still to be
resolved.");
- return; //drop the block report if the dn hasn't been resolved
- }
-
- node.setBlockReportProcessed(true);
//
// Modify the (block-->datanode) map, according to the difference
// between the old and new block report.
@@ -3526,22 +3468,23 @@
}
/**
- * Keeps track of which datanodes are allowed to connect to the namenode.
+ * Keeps track of which datanodes/ipaddress are allowed to connect to the
namenode.
*/
- private boolean inHostsList(DatanodeID node) {
+ private boolean inHostsList(DatanodeID node, String ipAddr) {
Set<String> hostsList = hostsReader.getHosts();
return (hostsList.isEmpty() ||
- hostsList.contains(node.getName()) ||
+ (ipAddr != null && hostsList.contains(ipAddr)) ||
hostsList.contains(node.getHost()) ||
+ hostsList.contains(node.getName()) ||
((node instanceof DatanodeInfo) &&
hostsList.contains(((DatanodeInfo)node).getHostName())));
}
-
-
- private boolean inExcludedHostsList(DatanodeID node) {
+
+ private boolean inExcludedHostsList(DatanodeID node, String ipAddr) {
Set<String> excludeList = hostsReader.getExcludedHosts();
- return (excludeList.contains(node.getName()) ||
+ return ((ipAddr != null && excludeList.contains(ipAddr)) ||
excludeList.contains(node.getHost()) ||
+ excludeList.contains(node.getName()) ||
((node instanceof DatanodeInfo) &&
excludeList.contains(((DatanodeInfo)node).getHostName())));
}
@@ -3569,10 +3512,10 @@
it.hasNext();) {
DatanodeDescriptor node = it.next();
// Check if not include.
- if (!inHostsList(node)) {
+ if (!inHostsList(node, null)) {
node.setDecommissioned(); // case 2.
} else {
- if (inExcludedHostsList(node)) {
+ if (inExcludedHostsList(node, null)) {
if (!node.isDecommissionInProgress() &&
!node.isDecommissioned()) {
startDecommission(node); // case 3.
@@ -3602,12 +3545,12 @@
* Returns TRUE if node is registered (including when it is on the
* exclude list and is being decommissioned).
*/
- public synchronized boolean verifyNodeRegistration(DatanodeRegistration
nodeReg)
+ private synchronized boolean verifyNodeRegistration(DatanodeRegistration
nodeReg, String ipAddr)
throws IOException {
- if (!inHostsList(nodeReg)) {
+ if (!inHostsList(nodeReg, ipAddr)) {
return false;
}
- if (inExcludedHostsList(nodeReg)) {
+ if (inExcludedHostsList(nodeReg, ipAddr)) {
DatanodeDescriptor node = getDatanode(nodeReg);
if (node == null) {
throw new IOException("verifyNodeRegistration: unknown datanode " +
Modified:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=682536&r1=682535&r2=682536&view=diff
==============================================================================
---
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
(original)
+++
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
Mon Aug 4 16:03:46 2008
@@ -136,7 +136,7 @@
String portString = port == DEFAULT_PORT ? "" : (":"+port);
return URI.create("hdfs://"+ namenode.getHostName()+portString);
}
-
+
/**
* Initialize the server
*
Modified:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeCommand.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeCommand.java?rev=682536&r1=682535&r2=682536&view=diff
==============================================================================
---
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeCommand.java
(original)
+++
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeCommand.java
Mon Aug 4 16:03:46 2008
@@ -28,12 +28,6 @@
public void write(DataOutput out) {}
}
- static class BlockReport extends DatanodeCommand {
- private BlockReport() {super(DatanodeProtocol.DNA_BLOCKREPORT);}
- public void readFields(DataInput in) {}
- public void write(DataOutput out) {}
- }
-
static class Finalize extends DatanodeCommand {
private Finalize() {super(DatanodeProtocol.DNA_FINALIZE);}
public void readFields(DataInput in) {}
@@ -45,10 +39,6 @@
new WritableFactory() {
public Writable newInstance() {return new Register();}
});
- WritableFactories.setFactory(BlockReport.class,
- new WritableFactory() {
- public Writable newInstance() {return new BlockReport();}
- });
WritableFactories.setFactory(Finalize.class,
new WritableFactory() {
public Writable newInstance() {return new Finalize();}
@@ -56,7 +46,6 @@
}
public static final DatanodeCommand REGISTER = new Register();
- public static final DatanodeCommand BLOCKREPORT = new BlockReport();
public static final DatanodeCommand FINALIZE = new Finalize();
private int action;
Modified:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=682536&r1=682535&r2=682536&view=diff
==============================================================================
---
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
(original)
+++
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
Mon Aug 4 16:03:46 2008
@@ -35,9 +35,9 @@
**********************************************************************/
public interface DatanodeProtocol extends VersionedProtocol {
/**
- * 16: Block parameter added to nextGenerationStamp().
+ * 17: Remove the request for block report.
*/
- public static final long versionID = 16L;
+ public static final long versionID = 17L;
// error code
final static int NOTIFY = 0;
@@ -54,8 +54,7 @@
final static int DNA_SHUTDOWN = 3; // shutdown node
final static int DNA_REGISTER = 4; // re-register
final static int DNA_FINALIZE = 5; // finalize previous upgrade
- final static int DNA_BLOCKREPORT = 6; // request a block report
- final static int DNA_RECOVERBLOCK = 7; // request a block recovery
+ final static int DNA_RECOVERBLOCK = 6; // request a block recovery
/**
* Register Datanode.
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=682536&r1=682535&r2=682536&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java Mon
Aug 4 16:03:46 2008
@@ -23,7 +23,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.nio.channels.FileChannel;
-import java.nio.ByteBuffer;
import java.util.Random;
import java.io.RandomAccessFile;
@@ -72,7 +71,6 @@
new ArrayList<DataNodeProperties>();
private File base_dir;
private File data_dir;
- private DNSToSwitchMapping dnsToSwitchMapping;
/**
@@ -400,7 +398,8 @@
String name = hosts[i - curDatanodesNum];
System.out.println("Adding node with hostname : " + name + " to rack "+
racks[i-curDatanodesNum]);
- StaticMapping.addNodeToRack(name, racks[i-curDatanodesNum]);
+ StaticMapping.addNodeToRack(name,
+ racks[i-curDatanodesNum]);
}
Configuration newconf = new Configuration(dnConf); // save config
if (hosts != null) {
@@ -713,22 +712,6 @@
} catch (Exception e) {
}
}
- int numResolved = 0;
- do {
- numResolved = 0;
- for (DatanodeInfo info : dnInfos) {
- if (!info.getNetworkLocation().equals(NetworkTopology.UNRESOLVED)) {
- numResolved++;
- } else {
- try {
- Thread.sleep(500);
- } catch (Exception e) {
- }
- dnInfos = client.datanodeReport(DatanodeReportType.LIVE);
- break;
- }
- }
- } while (numResolved != numDataNodes);
client.close();
}
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/NNThroughputBenchmark.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/NNThroughputBenchmark.java?rev=682536&r1=682535&r2=682536&view=diff
==============================================================================
---
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/NNThroughputBenchmark.java
(original)
+++
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/NNThroughputBenchmark.java
Mon Aug 4 16:03:46 2008
@@ -699,23 +699,6 @@
datanodes[idx].sendHeartbeat();
prevDNName = datanodes[idx].getName();
}
- int numResolved = 0;
- DatanodeInfo[] dnInfos =
nameNode.getDatanodeReport(DatanodeReportType.ALL);
- do {
- numResolved = 0;
- for (DatanodeInfo info : dnInfos) {
- if (!info.getNetworkLocation().equals(NetworkTopology.UNRESOLVED)) {
- numResolved++;
- } else {
- try {
- Thread.sleep(2);
- } catch (Exception e) {
- }
- dnInfos = nameNode.getDatanodeReport(DatanodeReportType.LIVE);
- break;
- }
- }
- } while (numResolved != nrDatanodes);
// create files
LOG.info("Creating " + nrFiles + " with " + blocksPerFile + " blocks
each.");