Author: ddas
Date: Thu Feb 12 17:48:42 2009
New Revision: 743816
URL: http://svn.apache.org/viewvc?rev=743816&view=rev
Log:
HADOOP-5067. Fixes TaskInProgress.java to keep track of count of failed and
killed tasks correctly. Contributed by Amareshwari Sriramadasu.
Added:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLostTracker.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=743816&r1=743815&r2=743816&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Feb 12 17:48:42 2009
@@ -841,6 +841,9 @@
HADOOP-5166. Fix JobTracker restart to work when ACLs are configured
for the JobTracker. (Amar Kamat via yhemanth).
+ HADOOP-5067. Fixes TaskInProgress.java to keep track of count of failed and
+ killed tasks correctly. (Amareshwari Sriramadasu via ddas)
+
Release 0.19.0 - 2008-11-18
INCOMPATIBLE CHANGES
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=743816&r1=743815&r2=743816&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
Thu Feb 12 17:48:42 2009
@@ -94,6 +94,8 @@
// Map from task Id -> TaskTracker Id, contains tasks that are
// currently runnings
private TreeMap<TaskAttemptID, String> activeTasks = new
TreeMap<TaskAttemptID, String>();
+ // All attempt Ids of this TIP
+ private TreeSet<TaskAttemptID> tasks = new TreeSet<TaskAttemptID>();
private JobConf conf;
private Map<TaskAttemptID,List<String>> taskDiagnosticData =
new TreeMap<TaskAttemptID,List<String>>();
@@ -602,9 +604,7 @@
}
}
- // Note that there can be failures of tasks that are hosted on a machine
- // that has not yet registered with restarted jobtracker
- boolean isPresent = this.activeTasks.remove(taskid) != null;
+ this.activeTasks.remove(taskid);
// Since we do not fail completed reduces (whose outputs go to hdfs), we
// should note this failure only for completed maps, only if this taskid;
@@ -618,8 +618,10 @@
resetSuccessfulTaskid();
}
+ // Note that there can be failures of tasks that are hosted on a machine
+ // that has not yet registered with restarted jobtracker
// recalculate the counts only if its a genuine failure
- if (isPresent) {
+ if (tasks.contains(taskid)) {
if (taskState == TaskStatus.State.FAILED) {
numTaskFailures++;
machinesWhereFailed.add(trackerHostName);
@@ -924,6 +926,7 @@
}
activeTasks.put(taskid, taskTracker);
+ tasks.add(taskid);
// Ask JobTracker to note that the task exists
jobtracker.createTaskEntry(taskid, taskTracker, this);
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=743816&r1=743815&r2=743816&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Thu
Feb 12 17:48:42 2009
@@ -235,6 +235,9 @@
return jobTracker;
}
+ TaskTrackerRunner getTaskTrackerRunner(int id) {
+ return taskTrackerList.get(id);
+ }
/**
* Get the number of task trackers in the cluster
*/
Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLostTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLostTracker.java?rev=743816&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLostTracker.java
(added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLostTracker.java
Thu Feb 12 17:48:42 2009
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import junit.framework.TestCase;
+import java.io.*;
+
+public class TestLostTracker extends TestCase {
+ final Path testDir = new Path("/jt-lost-tt");
+ final Path inDir = new Path(testDir, "input");
+ final Path shareDir = new Path(testDir, "share");
+ final Path outputDir = new Path(testDir, "output");
+
+ private JobConf configureJob(JobConf conf, int maps, int reduces,
+ String mapSignal, String redSignal)
+ throws IOException {
+ UtilsForTests.configureWaitingJobConf(conf, inDir, outputDir,
+ maps, reduces, "test-lost-tt",
+ mapSignal, redSignal);
+ return conf;
+ }
+
+ public void testLostTracker(MiniDFSCluster dfs,
+ MiniMRCluster mr)
+ throws IOException {
+ FileSystem fileSys = dfs.getFileSystem();
+ JobConf jobConf = mr.createJobConf();
+ int numMaps = 10;
+ int numReds = 1;
+ String mapSignalFile = UtilsForTests.getMapSignalFile(shareDir);
+ String redSignalFile = UtilsForTests.getReduceSignalFile(shareDir);
+
+ // Configure the job
+ JobConf job = configureJob(jobConf, numMaps, numReds,
+ mapSignalFile, redSignalFile);
+
+ fileSys.delete(shareDir, true);
+
+ // Submit the job
+ JobClient jobClient = new JobClient(job);
+ RunningJob rJob = jobClient.submitJob(job);
+ JobID id = rJob.getID();
+
+ // wait for the job to be inited
+ mr.initializeJob(id);
+
+ // Make sure that the master job is 50% completed
+ while (UtilsForTests.getJobStatus(jobClient, id).mapProgress()
+ < 0.5f) {
+ UtilsForTests.waitFor(10);
+ }
+
+ // get a completed task on 1st tracker
+ TaskAttemptID taskid = mr.getTaskTrackerRunner(0).getTaskTracker().
+ getNonRunningTasks().get(0).getTaskID();
+
+ // Kill the 1st tasktracker
+ mr.stopTaskTracker(0);
+
+ // Signal all the maps to complete
+ UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile,
redSignalFile);
+
+ // Signal the reducers to complete
+ UtilsForTests.signalTasks(dfs, fileSys, false, mapSignalFile,
+ redSignalFile);
+ // wait till the job is done
+ UtilsForTests.waitTillDone(jobClient);
+
+ // Check if the tasks on the lost tracker got killed and re-executed
+ assertTrue(jobClient.getClusterStatus().getTaskTrackers()
+ < mr.getNumTaskTrackers());
+ assertEquals(JobStatus.SUCCEEDED, rJob.getJobState());
+ TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().
+ getTip(taskid.getTaskID());
+ assertTrue(tip.isComplete());
+ assertEquals(tip.numKilledTasks(), 1);
+
+ }
+
+ public void testLostTracker() throws IOException {
+ String namenode = null;
+ MiniDFSCluster dfs = null;
+ MiniMRCluster mr = null;
+ FileSystem fileSys = null;
+
+ try {
+ Configuration conf = new Configuration();
+ conf.setBoolean("dfs.replication.considerLoad", false);
+ dfs = new MiniDFSCluster(conf, 1, true, null, null);
+ dfs.waitActive();
+ fileSys = dfs.getFileSystem();
+
+ // clean up
+ fileSys.delete(testDir, true);
+
+ if (!fileSys.mkdirs(inDir)) {
+ throw new IOException("Mkdirs failed to create " + inDir.toString());
+ }
+
+ // Write the input file
+ UtilsForTests.writeFile(dfs.getNameNode(), conf,
+ new Path(inDir + "/file"), (short)1);
+
+ dfs.startDataNodes(conf, 1, true, null, null, null, null);
+ dfs.waitActive();
+
+ namenode = (dfs.getFileSystem()).getUri().getHost() + ":"
+ + (dfs.getFileSystem()).getUri().getPort();
+
+ JobConf jtConf = new JobConf();
+ jtConf.setInt("mapred.tasktracker.map.tasks.maximum", 1);
+ jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
+ jtConf.setLong("mapred.tasktracker.expiry.interval", 10 * 1000);
+ jtConf.setInt("mapred.reduce.copy.backoff", 4);
+
+ mr = new MiniMRCluster(2, namenode, 1, null, null, jtConf);
+
+ // Test Lost tracker case
+ testLostTracker(dfs, mr);
+ } finally {
+ if (mr != null) {
+ try {
+ mr.shutdown();
+ } catch (Exception e) {}
+ }
+ if (dfs != null) {
+ try {
+ dfs.shutdown();
+ } catch (Exception e) {}
+ }
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+ new TestLostTracker().testLostTracker();
+ }
+}