Author: ddas
Date: Thu Jan 22 13:29:46 2009
New Revision: 736801
URL: http://svn.apache.org/viewvc?rev=736801&view=rev
Log:
HADOOP-4939. Adds a test that would inject random failures for tasks in large
jobs and would also inject TaskTracker failures. Contributed by Devaraj Das.
Added:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ReliabilityTest.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/core/org/apache/hadoop/util/Shell.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
hadoop/core/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=736801&r1=736800&r2=736801&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Jan 22 13:29:46 2009
@@ -377,6 +377,9 @@
HADOOP-4828. Updates documents to do with configuration (HADOOP-4631).
(Sharad Agarwal via ddas)
+ HADOOP-4939. Adds a test that would inject random failures for tasks in
+ large jobs and would also inject TaskTracker failures. (ddas)
+
OPTIMIZATIONS
HADOOP-3293. Fixes FileInputFormat to do provide locations for splits
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/util/Shell.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/util/Shell.java?rev=736801&r1=736800&r2=736801&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/util/Shell.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/util/Shell.java Thu Jan 22
13:29:46 2009
@@ -334,7 +334,23 @@
* @return the output of the executed command.
*/
public static String execCommand(String ... cmd) throws IOException {
+ return execCommand(null, cmd);
+ }
+
+ /**
+ * Static method to execute a shell command.
+ * Covers most of the simple cases without requiring the user to implement
+ * the <code>Shell</code> interface.
+ * @param env the map of environment key=value
+ * @param cmd shell command to execute.
+ * @return the output of the executed command.
+ */
+ public static String execCommand(Map<String,String> env, String ... cmd)
+ throws IOException {
ShellCommandExecutor exec = new ShellCommandExecutor(cmd);
+ if (env != null) {
+ exec.setEnvironment(env);
+ }
exec.execute();
return exec.getOutput();
}
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java?rev=736801&r1=736800&r2=736801&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java
Thu Jan 22 13:29:46 2009
@@ -61,6 +61,7 @@
private Collection<String> activeTrackers = new ArrayList<String>();
private Collection<String> blacklistedTrackers = new ArrayList<String>();
private int numBlacklistedTrackers;
+ private long ttExpiryInterval;
private int map_tasks;
private int reduce_tasks;
private int max_map_tasks;
@@ -85,7 +86,8 @@
@Deprecated
ClusterStatus(int trackers, int maps, int reduces, int maxMaps,
int maxReduces, JobTracker.State state) {
- this(trackers, 0, maps, reduces, maxMaps, maxReduces, state);
+ this(trackers, 0, JobTracker.TASKTRACKER_EXPIRY_INTERVAL, maps, reduces,
+ maxMaps, maxReduces, state);
}
/**
@@ -93,16 +95,19 @@
*
* @param trackers no. of tasktrackers in the cluster
* @param blacklists no of blacklisted task trackers in the cluster
+ * @param ttExpiryInterval the tasktracker expiry interval
* @param maps no. of currently running map-tasks in the cluster
* @param reduces no. of currently running reduce-tasks in the cluster
* @param maxMaps the maximum no. of map tasks in the cluster
* @param maxReduces the maximum no. of reduce tasks in the cluster
* @param state the {...@link JobTracker.State} of the
<code>JobTracker</code>
*/
- ClusterStatus(int trackers, int blacklists, int maps, int reduces,
+ ClusterStatus(int trackers, int blacklists, long ttExpiryInterval,
+ int maps, int reduces,
int maxMaps, int maxReduces, JobTracker.State state) {
numActiveTrackers = trackers;
numBlacklistedTrackers = blacklists;
+ this.ttExpiryInterval = ttExpiryInterval;
map_tasks = maps;
reduce_tasks = reduces;
max_map_tasks = maxMaps;
@@ -117,6 +122,7 @@
*
* @param activeTrackers active tasktrackers in the cluster
* @param blacklistedTrackers blacklisted tasktrackers in the cluster
+ * @param ttExpiryInterval the tasktracker expiry interval
* @param maps no. of currently running map-tasks in the cluster
* @param reduces no. of currently running reduce-tasks in the cluster
* @param maxMaps the maximum no. of map tasks in the cluster
@@ -125,10 +131,11 @@
*/
ClusterStatus(Collection<String> activeTrackers,
Collection<String> blacklistedTrackers,
+ long ttExpiryInterval,
int maps, int reduces, int maxMaps, int maxReduces,
JobTracker.State state) {
- this(activeTrackers.size(), blacklistedTrackers.size(), maps, reduces,
- maxMaps, maxReduces, state);
+ this(activeTrackers.size(), blacklistedTrackers.size(), ttExpiryInterval,
+ maps, reduces, maxMaps, maxReduces, state);
this.activeTrackers = activeTrackers;
this.blacklistedTrackers = blacklistedTrackers;
}
@@ -171,6 +178,14 @@
}
/**
+ * Get the tasktracker expiry interval for the cluster
+ * @return the expiry interval in msec
+ */
+ public long getTTExpiryInterval() {
+ return ttExpiryInterval;
+ }
+
+ /**
* Get the number of currently running map tasks in the cluster.
*
* @return the number of currently running map tasks in the cluster.
@@ -255,6 +270,7 @@
Text.writeString(out, tracker);
}
}
+ out.writeLong(ttExpiryInterval);
out.writeInt(map_tasks);
out.writeInt(reduce_tasks);
out.writeInt(max_map_tasks);
@@ -281,6 +297,7 @@
blacklistedTrackers.add(name);
}
}
+ ttExpiryInterval = in.readLong();
map_tasks = in.readInt();
reduce_tasks = in.readInt();
max_map_tasks = in.readInt();
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=736801&r1=736800&r2=736801&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java Thu
Jan 22 13:29:46 2009
@@ -1193,7 +1193,20 @@
* @throws IOException
*/
public ClusterStatus getClusterStatus() throws IOException {
- return jobSubmitClient.getClusterStatus(false);
+ return getClusterStatus(false);
+ }
+
+ /**
+ * Get status information about the Map-Reduce cluster.
+ *
+ * @param detailed if true then get a detailed status including the
+ * tracker names
+ * @return the status information about the Map-Reduce cluster as an object
+ * of {...@link ClusterStatus}.
+ * @throws IOException
+ */
+ public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
+ return jobSubmitClient.getClusterStatus(detailed);
}
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=736801&r1=736800&r2=736801&view=diff
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
(original)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
Thu Jan 22 13:29:46 2009
@@ -55,9 +55,11 @@
* for HADOOP-4305
* Version 19: Modified TaskReport to have TIP status and modified the
* method getClusterStatus() to take a boolean argument
- * for HADOOP-4807
+ * for HADOOP-4807
+ * Version 20: Modified ClusterStatus to have the tasktracker expiry
+ * interval for HADOOP-4939
*/
- public static final long versionID = 19L;
+ public static final long versionID = 20L;
/**
* Allocate a name for the job.
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=736801&r1=736800&r2=736801&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Thu
Jan 22 13:29:46 2009
@@ -2711,6 +2711,7 @@
List<List<String>> trackerNames = taskTrackerNames();
return new ClusterStatus(trackerNames.get(0),
trackerNames.get(1),
+ TASKTRACKER_EXPIRY_INTERVAL,
totalMaps,
totalReduces,
totalMapTaskCapacity,
@@ -2720,6 +2721,7 @@
return new ClusterStatus(taskTrackers.size() -
getBlacklistedTrackerCount(),
getBlacklistedTrackerCount(),
+ TASKTRACKER_EXPIRY_INTERVAL,
totalMaps,
totalReduces,
totalMapTaskCapacity,
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=736801&r1=736800&r2=736801&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
Thu Jan 22 13:29:46 2009
@@ -386,7 +386,7 @@
}
public ClusterStatus getClusterStatus(boolean detailed) {
- return new ClusterStatus(1, 0, map_tasks, reduce_tasks, 1, 1,
+ return new ClusterStatus(1, 0, 0, map_tasks, reduce_tasks, 1, 1,
JobTracker.State.RUNNING);
}
Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ReliabilityTest.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ReliabilityTest.java?rev=736801&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ReliabilityTest.java
(added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ReliabilityTest.java
Thu Jan 22 13:29:46 2009
@@ -0,0 +1,495 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * This class tests reliability of the framework in the face of failures of
+ * both tasks and tasktrackers. Steps:
+ * 1) Get the cluster status
+ * 2) Get the number of slots in the cluster
+ * 3) Spawn a sleepjob that occupies the entire cluster (with two waves of
maps)
+ * 4) Get the list of running attempts for the job
+ * 5) Fail a few of them
+ * 6) Now fail a few trackers (ssh)
+ * 7) Job should run to completion
+ * 8) The above is repeated for the Sort suite of job (randomwriter, sort,
+ * validator). All jobs must complete, and finally, the sort validation
+ * should succeed.
+ * To run the test:
+ * ./bin/hadoop --config <config> jar
+ * build/hadoop-<version>-test.jar MRReliabilityTest -libjars
+ * build/hadoop-<version>-examples.jar [-scratchdir <dir>]"
+ *
+ * The scratchdir is optional and by default the current directory on the
client
+ * will be used as the scratch space. Note that password-less SSH must be
set up
+ * between the client machine from where the test is submitted, and the
cluster
+ * nodes where the test runs.
+ */
+
+public class ReliabilityTest extends Configured implements Tool {
+
+ private String dir;
+ private static final Log LOG = LogFactory.getLog(ReliabilityTest.class);
+
+ private void displayUsage() {
+ LOG.info("This must be run in only the distributed mode " +
+ "(LocalJobRunner not supported).\n\tUsage: MRReliabilityTest " +
+ "-libjars <path to hadoop-examples.jar> [-scratchdir <dir>]" +
+ "\n[-scratchdir] points to a scratch space on this host where
temp" +
+ " files for this test will be created. Defaults to current
working" +
+ " dir. \nPasswordless SSH must be set up between this host and
the" +
+ " nodes which the test is going to use");
+ System.exit(-1);
+ }
+
+ public int run(String[] args) throws Exception {
+ Configuration conf = getConf();
+ if ("local".equals(conf.get("mapred.job.tracker", "local"))) {
+ displayUsage();
+ }
+ String[] otherArgs =
+ new GenericOptionsParser(conf, args).getRemainingArgs();
+ if (otherArgs.length == 2) {
+ if (otherArgs[0].equals("-scratchdir")) {
+ dir = otherArgs[1];
+ } else {
+ displayUsage();
+ }
+ }
+ else if (otherArgs.length == 0) {
+ dir = System.getProperty("user.dir");
+ } else {
+ displayUsage();
+ }
+
+ //to protect against the case of jobs failing even when multiple attempts
+ //fail, set some high values for the max attempts
+ conf.setInt("mapred.map.max.attempts", 10);
+ conf.setInt("mapred.reduce.max.attempts", 10);
+ runSleepJobTest(new JobClient(new JobConf(conf)), conf);
+ runSortJobTests(new JobClient(new JobConf(conf)), conf);
+ return 0;
+ }
+
+ private void runSleepJobTest(final JobClient jc, final Configuration conf)
+ throws Exception {
+ ClusterStatus c = jc.getClusterStatus();
+ int maxMaps = c.getMaxMapTasks() * 2;
+ int maxReduces = maxMaps;
+ int mapSleepTime = (int)c.getTTExpiryInterval();
+ int reduceSleepTime = mapSleepTime;
+ String[] sleepJobArgs = new String[] {
+ "-m", Integer.toString(maxMaps),
+ "-r", Integer.toString(maxReduces),
+ "-mt", Integer.toString(mapSleepTime),
+ "-rt", Integer.toString(reduceSleepTime)};
+ runTest(jc, conf, "org.apache.hadoop.examples.SleepJob", sleepJobArgs,
+ new KillTaskThread(jc, 2, 0.2f, false, 2),
+ new KillTrackerThread(jc, 2, 0.4f, false, 1));
+ LOG.info("SleepJob done");
+ }
+
+ private void runSortJobTests(final JobClient jc, final Configuration conf)
+ throws Exception {
+ String inputPath = "my_reliability_test_input";
+ String outputPath = "my_reliability_test_output";
+ FileSystem fs = jc.getFs();
+ fs.delete(new Path(inputPath), true);
+ fs.delete(new Path(outputPath), true);
+ runRandomWriterTest(jc, conf, inputPath);
+ runSortTest(jc, conf, inputPath, outputPath);
+ runSortValidatorTest(jc, conf, inputPath, outputPath);
+ }
+
+ private void runRandomWriterTest(final JobClient jc,
+ final Configuration conf, final String inputPath)
+ throws Exception {
+ runTest(jc, conf, "org.apache.hadoop.examples.RandomWriter",
+ new String[]{inputPath},
+ null, new KillTrackerThread(jc, 0, 0.4f, false, 1));
+ LOG.info("RandomWriter job done");
+ }
+
+ private void runSortTest(final JobClient jc, final Configuration conf,
+ final String inputPath, final String outputPath)
+ throws Exception {
+ runTest(jc, conf, "org.apache.hadoop.examples.Sort",
+ new String[]{inputPath, outputPath},
+ new KillTaskThread(jc, 2, 0.2f, false, 2),
+ new KillTrackerThread(jc, 2, 0.8f, false, 1));
+ LOG.info("Sort job done");
+ }
+
+ private void runSortValidatorTest(final JobClient jc,
+ final Configuration conf, final String inputPath, final String
outputPath)
+ throws Exception {
+ runTest(jc, conf, "org.apache.hadoop.mapred.SortValidator", new String[] {
+ "-sortInput", inputPath, "-sortOutput", outputPath},
+ new KillTaskThread(jc, 2, 0.2f, false, 1),
+ new KillTrackerThread(jc, 2, 0.8f, false, 1));
+ LOG.info("SortValidator job done");
+ }
+
+ private String normalizeCommandPath(String command) {
+ final String hadoopHome;
+ if ((hadoopHome = System.getenv("HADOOP_HOME")) != null) {
+ command = hadoopHome + "/" + command;
+ }
+ return command;
+ }
+
+ private void checkJobExitStatus(int status, String jobName) {
+ if (status != 0) {
+ LOG.info(jobName + " job failed with status: " + status);
+ System.exit(status);
+ } else {
+ LOG.info(jobName + " done.");
+ }
+ }
+
+ //Starts the job in a thread. It also starts the taskKill/tasktrackerKill
+ //threads.
+ private void runTest(final JobClient jc, final Configuration conf,
+ final String jobClass, final String[] args, KillTaskThread
killTaskThread,
+ KillTrackerThread killTrackerThread) throws Exception {
+ int prevJobsNum = jc.getAllJobs().length;
+ Thread t = new Thread("Job Test") {
+ public void run() {
+ try {
+ Class<?> jobClassObj = conf.getClassByName(jobClass);
+ int status = ToolRunner.run(conf, (Tool)(jobClassObj.newInstance()),
+ args);
+ checkJobExitStatus(status, jobClass);
+ } catch (Exception e) {
+ LOG.fatal("JOB " + jobClass + " failed to run");
+ System.exit(-1);
+ }
+ }
+ };
+ t.setDaemon(true);
+ t.start();
+ JobStatus[] jobs;
+ //get the job ID. This is the job that we just submitted
+ while ((jobs = jc.getAllJobs()).length - prevJobsNum == 0) {
+ LOG.info("Waiting for the job " + jobClass +" to start");
+ Thread.sleep(1000);
+ }
+ JobID jobId = jobs[jobs.length - 1].getJobID();
+ RunningJob rJob = jc.getJob(jobId);
+ while (rJob.getJobState() == JobStatus.PREP) {
+ LOG.info("JobID : " + jobId + " not started RUNNING yet");
+ Thread.sleep(1000);
+ rJob = jc.getJob(jobId);
+ }
+ if (killTaskThread != null) {
+ killTaskThread.setRunningJob(rJob);
+ killTaskThread.start();
+ killTaskThread.join();
+ LOG.info("DONE WITH THE TASK KILL/FAIL TESTS");
+ }
+ if (killTrackerThread != null) {
+ killTrackerThread.setRunningJob(rJob);
+ killTrackerThread.start();
+ killTrackerThread.join();
+ LOG.info("DONE WITH THE TESTS TO DO WITH LOST TASKTRACKERS");
+ }
+ t.join();
+ }
+
+ private class KillTrackerThread extends Thread {
+ private volatile boolean killed = false;
+ private JobClient jc;
+ private RunningJob rJob;
+ final private int thresholdMultiplier;
+ private float threshold = 0.2f;
+ private boolean onlyMapsProgress;
+ private int numIterations;
+ final private String slavesFile = dir + "/_reliability_test_slaves_file_";
+ final String shellCommand = normalizeCommandPath("bin/slaves.sh");
+ final private String STOP_COMMAND = "ps uwwx | grep java | grep " +
+ "org.apache.hadoop.mapred.TaskTracker"+ " |" +
+ " grep -v grep | tr -s ' ' | cut -d ' ' -f2 | xargs kill -s STOP";
+ final private String RESUME_COMMAND = "ps uwwx | grep java | grep " +
+ "org.apache.hadoop.mapred.TaskTracker"+ " |" +
+ " grep -v grep | tr -s ' ' | cut -d ' ' -f2 | xargs kill -s CONT";
+ //Only one instance must be active at any point
+ public KillTrackerThread(JobClient jc, int threshaldMultiplier,
+ float threshold, boolean onlyMapsProgress, int numIterations) {
+ this.jc = jc;
+ this.thresholdMultiplier = threshaldMultiplier;
+ this.threshold = threshold;
+ this.onlyMapsProgress = onlyMapsProgress;
+ this.numIterations = numIterations;
+ setDaemon(true);
+ }
+ public void setRunningJob(RunningJob rJob) {
+ this.rJob = rJob;
+ }
+ public void kill() {
+ killed = true;
+ }
+ public void run() {
+ stopStartTrackers(true);
+ if (!onlyMapsProgress) {
+ stopStartTrackers(false);
+ }
+ }
+ private void stopStartTrackers(boolean considerMaps) {
+ if (considerMaps) {
+ LOG.info("Will STOP/RESUME tasktrackers based on Maps'" +
+ " progress");
+ } else {
+ LOG.info("Will STOP/RESUME tasktrackers based on " +
+ "Reduces' progress");
+ }
+ LOG.info("Initial progress threshold: " + threshold +
+ ". Threshold Multiplier: " + thresholdMultiplier +
+ ". Number of iterations: " + numIterations);
+ float thresholdVal = threshold;
+ int numIterationsDone = 0;
+ while (!killed) {
+ try {
+ float progress;
+ if (jc.getJob(rJob.getID()).isComplete() ||
+ numIterationsDone == numIterations) {
+ break;
+ }
+
+ if (considerMaps) {
+ progress = jc.getJob(rJob.getID()).mapProgress();
+ } else {
+ progress = jc.getJob(rJob.getID()).reduceProgress();
+ }
+ if (progress >= thresholdVal) {
+ numIterationsDone++;
+ ClusterStatus c;
+ stopTaskTrackers((c = jc.getClusterStatus(true)));
+ Thread.sleep((int)Math.ceil(1.5 * c.getTTExpiryInterval()));
+ startTaskTrackers();
+ thresholdVal = thresholdVal * thresholdMultiplier;
+ }
+ Thread.sleep(5000);
+ } catch (InterruptedException ie) {
+ killed = true;
+ return;
+ } catch (Exception e) {
+ LOG.fatal(StringUtils.stringifyException(e));
+ }
+ }
+ }
+ private void stopTaskTrackers(ClusterStatus c) throws Exception {
+
+ Collection <String> trackerNames = c.getActiveTrackerNames();
+ ArrayList<String> trackerNamesList = new ArrayList<String>(trackerNames);
+ Collections.shuffle(trackerNamesList);
+
+ int count = 0;
+
+ FileOutputStream fos = new FileOutputStream(new File(slavesFile));
+ LOG.info(new Date() + " Stopping a few trackers");
+
+ for (String tracker : trackerNamesList) {
+ String host = convertTrackerNameToHostName(tracker);
+ LOG.info(new Date() + " Marking tracker on host: " + host);
+ fos.write((host + "\n").getBytes());
+ if (count++ >= trackerNamesList.size()/2) {
+ break;
+ }
+ }
+ fos.close();
+
+ runOperationOnTT("suspend");
+ }
+
+ private void startTaskTrackers() throws Exception {
+ LOG.info(new Date() + " Resuming the stopped trackers");
+ runOperationOnTT("resume");
+ new File(slavesFile).delete();
+ }
+
+ private void runOperationOnTT(String operation) throws IOException {
+ Map<String,String> hMap = new HashMap<String,String>();
+ hMap.put("HADOOP_SLAVES", slavesFile);
+ StringTokenizer strToken;
+ if (operation.equals("suspend")) {
+ strToken = new StringTokenizer(STOP_COMMAND, " ");
+ } else {
+ strToken = new StringTokenizer(RESUME_COMMAND, " ");
+ }
+ String commandArgs[] = new String[strToken.countTokens() + 1];
+ int i = 0;
+ commandArgs[i++] = shellCommand;
+ while (strToken.hasMoreTokens()) {
+ commandArgs[i++] = strToken.nextToken();
+ }
+ String output = Shell.execCommand(hMap, commandArgs);
+ if (output != null && !output.equals("")) {
+ LOG.info(output);
+ }
+ }
+
+ private String convertTrackerNameToHostName(String trackerName) {
+ // Convert the trackerName to it's host name
+ int indexOfColon = trackerName.indexOf(":");
+ String trackerHostName = (indexOfColon == -1) ?
+ trackerName :
+ trackerName.substring(0, indexOfColon);
+ return trackerHostName.substring("tracker_".length());
+ }
+
+ }
+
+ private class KillTaskThread extends Thread {
+
+ private volatile boolean killed = false;
+ private RunningJob rJob;
+ private JobClient jc;
+ final private int thresholdMultiplier;
+ private float threshold = 0.2f;
+ private boolean onlyMapsProgress;
+ private int numIterations;
+ public KillTaskThread(JobClient jc, int thresholdMultiplier,
+ float threshold, boolean onlyMapsProgress, int numIterations) {
+ this.jc = jc;
+ this.thresholdMultiplier = thresholdMultiplier;
+ this.threshold = threshold;
+ this.onlyMapsProgress = onlyMapsProgress;
+ this.numIterations = numIterations;
+ setDaemon(true);
+ }
+ public void setRunningJob(RunningJob rJob) {
+ this.rJob = rJob;
+ }
+ public void kill() {
+ killed = true;
+ }
+ public void run() {
+ killBasedOnProgress(true);
+ if (!onlyMapsProgress) {
+ killBasedOnProgress(false);
+ }
+ }
+ private void killBasedOnProgress(boolean considerMaps) {
+ boolean fail = false;
+ if (considerMaps) {
+ LOG.info("Will kill tasks based on Maps' progress");
+ } else {
+ LOG.info("Will kill tasks based on Reduces' progress");
+ }
+ LOG.info("Initial progress threshold: " + threshold +
+ ". Threshold Multiplier: " + thresholdMultiplier +
+ ". Number of iterations: " + numIterations);
+ float thresholdVal = threshold;
+ int numIterationsDone = 0;
+ while (!killed) {
+ try {
+ float progress;
+ if (jc.getJob(rJob.getID()).isComplete() ||
+ numIterationsDone == numIterations) {
+ break;
+ }
+ if (considerMaps) {
+ progress = jc.getJob(rJob.getID()).mapProgress();
+ } else {
+ progress = jc.getJob(rJob.getID()).reduceProgress();
+ }
+ if (progress >= thresholdVal) {
+ numIterationsDone++;
+ if (numIterationsDone > 0 && numIterationsDone % 2 == 0) {
+ fail = true; //fail tasks instead of kill
+ }
+ ClusterStatus c = jc.getClusterStatus();
+
+ LOG.info(new Date() + " Killing a few tasks");
+
+ Collection<TaskAttemptID> runningTasks =
+ new ArrayList<TaskAttemptID>();
+ TaskReport mapReports[] = jc.getMapTaskReports(rJob.getID());
+ for (TaskReport mapReport : mapReports) {
+ if (mapReport.getCurrentStatus() == TIPStatus.RUNNING) {
+ runningTasks.addAll(mapReport.getRunningTaskAttempts());
+ }
+ }
+ if (runningTasks.size() > c.getTaskTrackers()/2) {
+ int count = 0;
+ for (TaskAttemptID t : runningTasks) {
+ LOG.info(new Date() + " Killed task : " + t);
+ rJob.killTask(t, fail);
+ if (count++ > runningTasks.size()/2) { //kill 50%
+ break;
+ }
+ }
+ }
+ runningTasks.clear();
+ TaskReport reduceReports[] = jc.getReduceTaskReports(rJob.getID());
+ for (TaskReport reduceReport : reduceReports) {
+ if (reduceReport.getCurrentStatus() == TIPStatus.RUNNING) {
+ runningTasks.addAll(reduceReport.getRunningTaskAttempts());
+ }
+ }
+ if (runningTasks.size() > c.getTaskTrackers()/2) {
+ int count = 0;
+ for (TaskAttemptID t : runningTasks) {
+ LOG.info(new Date() + " Killed task : " + t);
+ rJob.killTask(t, fail);
+ if (count++ > runningTasks.size()/2) { //kill 50%
+ break;
+ }
+ }
+ }
+ thresholdVal = thresholdVal * thresholdMultiplier;
+ }
+ Thread.sleep(5000);
+ } catch (InterruptedException ie) {
+ killed = true;
+ } catch (Exception e) {
+ LOG.fatal(StringUtils.stringifyException(e));
+ }
+ }
+ }
+ }
+
+ public static void main(String args[]) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new ReliabilityTest(), args);
+ System.exit(res);
+ }
+}
\ No newline at end of file
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=736801&r1=736800&r2=736801&view=diff
==============================================================================
---
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
(original)
+++
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
Thu Jan 22 13:29:46 2009
@@ -135,7 +135,9 @@
@Override
public ClusterStatus getClusterStatus() {
int numTrackers = trackers.size();
- return new ClusterStatus(numTrackers, 0, maps, reduces,
+ return new ClusterStatus(numTrackers, 0,
+ JobTracker.TASKTRACKER_EXPIRY_INTERVAL,
+ maps, reduces,
numTrackers * maxMapTasksPerTracker,
numTrackers * maxReduceTasksPerTracker,
JobTracker.State.RUNNING);
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java?rev=736801&r1=736800&r2=736801&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java Thu
Jan 22 13:29:46 2009
@@ -22,6 +22,7 @@
import org.apache.hadoop.mapred.BigMapOutput;
import org.apache.hadoop.mapred.GenericMRLoadGenerator;
import org.apache.hadoop.mapred.MRBench;
+import org.apache.hadoop.mapred.ReliabilityTest;
import org.apache.hadoop.mapred.SortValidator;
import org.apache.hadoop.mapred.TestMapRed;
import org.apache.hadoop.mapred.TestSequenceFileInputFormat;
@@ -74,6 +75,9 @@
pgd.addClass("filebench", FileBench.class, "Benchmark
SequenceFile(Input|Output)Format (block,record compressed and uncompressed),
Text(Input|Output)Format (compressed and uncompressed)");
pgd.addClass("dfsthroughput", BenchmarkThroughput.class,
"measure hdfs throughput");
+ pgd.addClass("MRReliabilityTest", ReliabilityTest.class,
+ "A program that tests the reliability of the MR framework by " +
+ "injecting faults/failures");
pgd.driver(argv);
} catch(Throwable e) {
e.printStackTrace();