Author: omalley
Date: Tue Aug 12 14:46:24 2008
New Revision: 685330
URL: http://svn.apache.org/viewvc?rev=685330&view=rev
Log:
HADOOP-3780. Remove asynchronous resolution of network topology in the
JobTracker (Amar Kamat via omalley)
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=685330&r1=685329&r2=685330&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Aug 12 14:46:24 2008
@@ -172,6 +172,9 @@
HADOOP-3851. Fix spelling mistake in FSNamesystemMetrics. (Steve Loughran
via omalley)
+ HADOOP-3780. Remove asynchronous resolution of network topology in the
+ JobTracker (Amar Kamat via omalley)
+
OPTIMIZATIONS
HADOOP-3556. Removed lock contention in MD5Hash by changing the
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=685330&r1=685329&r2=685330&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Tue
Aug 12 14:46:24 2008
@@ -87,7 +87,6 @@
private DNSToSwitchMapping dnsToSwitchMapping;
private NetworkTopology clusterMap = new NetworkTopology();
- private ResolutionThread resThread = new ResolutionThread();
private int numTaskCacheLevels; // the max level to which we cache tasks
private Set<Node> nodesAtMaxLevel = new HashSet<Node>();
private final TaskScheduler taskScheduler;
@@ -688,7 +687,6 @@
this.expireTrackersThread = new Thread(this.expireTrackers,
"expireTrackers");
this.expireTrackersThread.start();
- this.resThread.start();
this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");
this.retireJobsThread.start();
taskScheduler.start();
@@ -758,15 +756,6 @@
ex.printStackTrace();
}
}
- if (this.resThread != null) {
- LOG.info("Stopping DNSToSwitchMapping Resolution thread");
- this.resThread.interrupt();
- try {
- this.resThread.join();
- } catch (InterruptedException ex) {
- ex.printStackTrace();
- }
- }
if (this.completedJobsStoreThread != null &&
this.completedJobsStoreThread.isAlive()) {
LOG.info("Stopping completedJobsStore thread");
@@ -1211,6 +1200,12 @@
}
}
+ // Register the tracker if its not registered
+ if (getNode(trackerName) == null) {
+ // Making the network location resolution inline ..
+ resolveAndAddToTopology(status.getHost());
+ }
+
// Process this heartbeat
short newResponseId = (short)(responseId + 1);
if (!processHeartbeat(status, initialContact)) {
@@ -1390,7 +1385,6 @@
if (initialContact) {
trackerExpiryQueue.add(trackerStatus);
- resThread.addToResolutionQueue(trackerStatus);
}
}
}
@@ -1400,64 +1394,6 @@
return true;
}
- private class ResolutionThread extends Thread {
- private LinkedBlockingQueue<TaskTrackerStatus> queue =
- new LinkedBlockingQueue <TaskTrackerStatus>();
- public ResolutionThread() {
- setName("DNSToSwitchMapping reolution Thread");
- setDaemon(true);
- }
- public void addToResolutionQueue(TaskTrackerStatus t) {
- while (!queue.add(t)) {
- LOG.warn("Couldn't add to the Resolution queue now. Will " +
- "try again");
- try {
- Thread.sleep(2000);
- } catch (InterruptedException ie) {}
- }
- }
- @Override
- public void run() {
- while (!isInterrupted()) {
- try {
- List <TaskTrackerStatus> statuses =
- new ArrayList<TaskTrackerStatus>(queue.size());
- // Block if the queue is empty
- statuses.add(queue.take());
- queue.drainTo(statuses);
- List<String> dnHosts = new ArrayList<String>(statuses.size());
- for (TaskTrackerStatus t : statuses) {
- dnHosts.add(t.getHost());
- }
- List<String> rName = dnsToSwitchMapping.resolve(dnHosts);
- if (rName == null) {
- LOG.error("The resolve call returned null! Using " +
- NetworkTopology.DEFAULT_RACK + " for some hosts");
- rName = new ArrayList<String>(dnHosts.size());
- for (int i = 0; i < dnHosts.size(); i++) {
- rName.add(NetworkTopology.DEFAULT_RACK);
- }
- }
- int i = 0;
- for (String m : rName) {
- String host = statuses.get(i++).getHost();
- String networkLoc = NodeBase.normalize(m);
- addHostToNodeMapping(host, networkLoc);
- numResolved++;
- }
- } catch (InterruptedException ie) {
- LOG.warn(getName() + " exiting, got interrupted: " +
- StringUtils.stringifyException(ie));
- return;
- } catch (Throwable t) {
- LOG.error(getName() + " got an exception: " +
- StringUtils.stringifyException(t));
- }
- }
- LOG.warn(getName() + " exiting...");
- }
- }
-
/**
* A tracker wants to know if any of its Tasks have been
* closed (because the job completed, whether successfully or not)
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=685330&r1=685329&r2=685330&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Tue
Aug 12 14:46:24 2008
@@ -426,13 +426,6 @@
taskTrackerThread.start();
}
- // Wait till the MR cluster stabilizes
- while(jobTracker.tracker.getNumResolvedTaskTrackers() != numTaskTrackers) {
- try {
- Thread.sleep(20);
- } catch (InterruptedException ie) {
- }
- }
waitUntilIdle();
}