Author: ddas
Date: Tue Dec 30 03:59:07 2008
New Revision: 730106
URL: http://svn.apache.org/viewvc?rev=730106&view=rev
Log:
HADOOP-3780. Fixes a problem to do with NPE due to nodes not being resolved by
the resolution thread in time. Patch contributed by Ravi Gummadi for the 0.18
branch (Amar Kamat had fixed the issue for 0.19).
Modified:
hadoop/core/branches/branch-0.18/CHANGES.txt
hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
Modified: hadoop/core/branches/branch-0.18/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/CHANGES.txt?rev=730106&r1=730105&r2=730106&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.18/CHANGES.txt Tue Dec 30 03:59:07 2008
@@ -121,6 +121,9 @@
HADOOP-4951. Lease monitor should acquire the LeaseManager lock but not the
Monitor lock. (szetszwo)
+ HADOOP-3780. Fixes a problem to do with NPE due to nodes not being
+ resolved by the resolution thread in time. (Ravia Gummadi via ddas)
+
Release 0.18.2 - 2008-11-03
BUG FIXES
Modified:
hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=730106&r1=730105&r2=730106&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++
hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Tue Dec 30 03:59:07 2008
@@ -86,7 +86,6 @@
private DNSToSwitchMapping dnsToSwitchMapping;
private NetworkTopology clusterMap = new NetworkTopology();
- private ResolutionThread resThread = new ResolutionThread();
private int numTaskCacheLevels; // the max level to which we cache tasks
private Set<Node> nodesAtMaxLevel = new HashSet<Node>();
@@ -652,6 +651,11 @@
this.localMachine = addr.getHostName();
this.port = addr.getPort();
int handlerCount = conf.getInt("mapred.job.tracker.handler.count", 10);
+
+ this.dnsToSwitchMapping = (DNSToSwitchMapping)ReflectionUtils.newInstance(
+ conf.getClass("topology.node.switch.mapping.impl",
ScriptBasedMapping.class,
+ DNSToSwitchMapping.class), conf);
+
this.interTrackerServer = RPC.getServer(this, addr.getHostName(),
addr.getPort(), handlerCount, false, conf);
this.interTrackerServer.start();
if (LOG.isDebugEnabled()) {
@@ -741,9 +745,6 @@
infoServer.setAttribute("fileSys", historyFS);
}
- this.dnsToSwitchMapping = (DNSToSwitchMapping)ReflectionUtils.newInstance(
- conf.getClass("topology.node.switch.mapping.impl",
ScriptBasedMapping.class,
- DNSToSwitchMapping.class), conf);
this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels",
NetworkTopology.DEFAULT_HOST_LEVEL);
synchronized (this) {
@@ -769,7 +770,6 @@
this.expireTrackersThread = new Thread(this.expireTrackers,
"expireTrackers");
this.expireTrackersThread.start();
- this.resThread.start();
this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");
this.retireJobsThread.start();
this.initJobsThread = new Thread(this.initJobs, "initJobs");
@@ -846,15 +846,6 @@
ex.printStackTrace();
}
}
- if (this.resThread != null) {
- LOG.info("Stopping DNSToSwitchMapping Resolution thread");
- this.resThread.interrupt();
- try {
- this.resThread.join();
- } catch (InterruptedException ex) {
- ex.printStackTrace();
- }
- }
if (this.completedJobsStoreThread != null &&
this.completedJobsStoreThread.isAlive()) {
LOG.info("Stopping completedJobsStore thread");
@@ -1287,6 +1278,12 @@
}
}
+ // Register the tracker if its not registered
+ if (getNode(trackerName) == null) {
+ // Making the network location resolution inline ..
+ resolveAndAddToTopology(status.getHost());
+ }
+
// Process this heartbeat
short newResponseId = (short)(responseId + 1);
if (!processHeartbeat(status, initialContact)) {
@@ -1453,7 +1450,6 @@
if (initialContact) {
trackerExpiryQueue.add(trackerStatus);
- resThread.addToResolutionQueue(trackerStatus);
}
}
}
@@ -1463,64 +1459,6 @@
return true;
}
- private class ResolutionThread extends Thread {
- private LinkedBlockingQueue<TaskTrackerStatus> queue =
- new LinkedBlockingQueue <TaskTrackerStatus>();
- public ResolutionThread() {
- setName("DNSToSwitchMapping reolution Thread");
- setDaemon(true);
- }
- public void addToResolutionQueue(TaskTrackerStatus t) {
- while (!queue.add(t)) {
- LOG.warn("Couldn't add to the Resolution queue now. Will " +
- "try again");
- try {
- Thread.sleep(2000);
- } catch (InterruptedException ie) {}
- }
- }
- @Override
- public void run() {
- while (!isInterrupted()) {
- try {
- List <TaskTrackerStatus> statuses =
- new ArrayList<TaskTrackerStatus>(queue.size());
- // Block if the queue is empty
- statuses.add(queue.take());
- queue.drainTo(statuses);
- List<String> dnHosts = new ArrayList<String>(statuses.size());
- for (TaskTrackerStatus t : statuses) {
- dnHosts.add(t.getHost());
- }
- List<String> rName = dnsToSwitchMapping.resolve(dnHosts);
- if (rName == null) {
- LOG.error("The resolve call returned null! Using " +
- NetworkTopology.DEFAULT_RACK + " for some hosts");
- rName = new ArrayList<String>(dnHosts.size());
- for (int i = 0; i < dnHosts.size(); i++) {
- rName.add(NetworkTopology.DEFAULT_RACK);
- }
- }
- int i = 0;
- for (String m : rName) {
- String host = statuses.get(i++).getHost();
- String networkLoc = NodeBase.normalize(m);
- addHostToNodeMapping(host, networkLoc);
- numResolved++;
- }
- } catch (InterruptedException ie) {
- LOG.warn(getName() + " exiting, got interrupted: " +
- StringUtils.stringifyException(ie));
- return;
- } catch (Throwable t) {
- LOG.error(getName() + " got an exception: " +
- StringUtils.stringifyException(t));
- }
- }
- LOG.warn(getName() + " exiting...");
- }
- }
-
/**
* Returns a task we'd like the TaskTracker to execute right now.
*
Modified:
hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=730106&r1=730105&r2=730106&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
(original)
+++
hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
Tue Dec 30 03:59:07 2008
@@ -426,13 +426,6 @@
taskTrackerThread.start();
}
- // Wait till the MR cluster stabilizes
- while(jobTracker.tracker.getNumResolvedTaskTrackers() != numTaskTrackers) {
- try {
- Thread.sleep(20);
- } catch (InterruptedException ie) {
- }
- }
waitUntilIdle();
}