Author: omalley
Date: Thu Aug 28 14:50:50 2008
New Revision: 690020
URL: http://svn.apache.org/viewvc?rev=690020&view=rev
Log:
HADOOP-3950. Cause the Mini MR cluster to wait for task trackers to
register before continuing. (enis via omalley)
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=690020&r1=690019&r2=690020&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Aug 28 14:50:50 2008
@@ -417,6 +417,9 @@
HADOOP-4037. Fix the eclipse plugin for versions of kfs and log4j. (nigel
via omalley)
+ HADOOP-3950. Cause the Mini MR cluster to wait for task trackers to
+ register before continuing. (enis via omalley)
+
Release 0.18.1 - Unreleased
BUG FIXES
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=690020&r1=690019&r2=690020&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Thu
Aug 28 14:50:50 2008
@@ -17,17 +17,20 @@
*/
package org.apache.hadoop.mapred;
-import java.io.*;
-import java.util.*;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.StaticMapping;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UnixUserGroupInformation;
-import org.apache.hadoop.fs.FileSystem;
/**
* This class creates a single-process Map-Reduce cluster for junit testing.
@@ -50,6 +53,8 @@
private String namenode;
private UnixUserGroupInformation ugi = null;
+ private JobConf job;
+
/**
* An inner class that runs a job tracker.
*/
@@ -222,8 +227,7 @@
* @return the absolute pathname of the local dir
*/
public String getTaskTrackerLocalDir(int taskTracker) {
- return ((TaskTrackerRunner)
- taskTrackerList.get(taskTracker)).getLocalDir();
+ return (taskTrackerList.get(taskTracker)).getLocalDir();
}
public JobTrackerRunner getJobTrackerRunner() {
@@ -241,8 +245,32 @@
* Wait until the system is idle.
*/
public void waitUntilIdle() {
- for(Iterator itr= taskTrackerList.iterator(); itr.hasNext();) {
- TaskTrackerRunner runner = (TaskTrackerRunner) itr.next();
+ waitTaskTrackers();
+
+ JobClient client;
+ try {
+ client = new JobClient(job);
+
while(client.getClusterStatus().getTaskTrackers()<taskTrackerList.size()) {
+ for(TaskTrackerRunner runner : taskTrackerList) {
+ if(runner.isDead) {
+ throw new RuntimeException("TaskTracker is dead");
+ }
+ }
+ Thread.sleep(1000);
+ }
+ }
+ catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+
+ }
+
+ private void waitTaskTrackers() {
+ for(Iterator<TaskTrackerRunner> itr= taskTrackerList.iterator();
itr.hasNext();) {
+ TaskTrackerRunner runner = itr.next();
while (!runner.isDead && (!runner.isInitialized || !runner.tt.isIdle()))
{
if (!runner.isInitialized) {
LOG.info("Waiting for task tracker to start.");
@@ -256,7 +284,7 @@
}
}
}
-
+
/**
* Get the actual rpc port used.
*/
@@ -269,6 +297,9 @@
}
public JobConf createJobConf(JobConf conf) {
+ if(conf == null) {
+ conf = new JobConf();
+ }
JobConf result = new JobConf(conf);
FileSystem.setDefaultUri(result, namenode);
result.set("mapred.job.tracker", "localhost:"+jobTrackerPort);
@@ -399,7 +430,7 @@
this.numTaskTrackers = numTaskTrackers;
this.namenode = namenode;
this.ugi = ugi;
-
+
// Create the JobTracker
jobTracker = new JobTrackerRunner(conf);
jobTrackerThread = new Thread(jobTracker);
@@ -439,7 +470,8 @@
for (Thread taskTrackerThread : taskTrackerThreadList){
taskTrackerThread.start();
}
-
+
+ this.job = createJobConf(conf);
waitUntilIdle();
}
@@ -448,10 +480,10 @@
*/
public void shutdown() {
try {
- waitUntilIdle();
+ waitTaskTrackers();
for (int idx = 0; idx < numTaskTrackers; idx++) {
- TaskTrackerRunner taskTracker = (TaskTrackerRunner)
taskTrackerList.get(idx);
- Thread taskTrackerThread = (Thread) taskTrackerThreadList.get(idx);
+ TaskTrackerRunner taskTracker = taskTrackerList.get(idx);
+ Thread taskTrackerThread = taskTrackerThreadList.get(idx);
taskTracker.shutdown();
taskTrackerThread.interrupt();
try {