Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java?rev=695823&view=auto ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java (added) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java Tue Sep 16 05:05:18 2008 @@ -0,0 +1,832 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat; +import org.apache.hadoop.mapred.ThreadedMapBenchmark.RandomInputFormat; +import org.apache.hadoop.mapred.lib.IdentityReducer; + +import junit.framework.TestCase; +import java.io.*; +import java.util.Iterator; + +/** + * TestJobTrackerRestart checks if the jobtracker can restart. JobTracker + * should be able to continue running the previously running jobs and also + * recover previosuly submitted jobs. + */ +public class TestJobTrackerRestart extends TestCase { + final static Object waitLock = new Object(); + final Path testDir = new Path("/jt-restart-testing"); + final Path inDir = new Path(testDir, "input"); + final Path shareDir = new Path(testDir, "share"); + final Path outputDir = new Path(testDir, "output"); + private static int numJobsSubmitted = 0; + + /** + * Gets job status from the jobtracker given the jobclient and the job id + */ + static JobStatus getJobStatus(JobClient jc, JobID id) throws IOException { + JobStatus[] statuses = jc.getAllJobs(); + for (JobStatus jobStatus : statuses) { + if (jobStatus.getJobID().equals(id)) { + return jobStatus; + } + } + return null; + } + + /** + * Return the job conf configured with the priorities and mappers as passed. + * @param conf The default conf + * @param priorities priorities for the jobs + * @param numMaps number of maps for the jobs + * @param numReds number of reducers for the jobs + * @param outputDir output dir + * @param inDir input dir + * @param mapSignalFile filename thats acts as a signal for maps + * @param reduceSignalFile filename thats acts as a signal for reducers + * @return a array of jobconfs configured as needed + * @throws IOException + */ + static JobConf[] getJobs(JobConf conf, JobPriority[] priorities, + int[] numMaps, int[] numReds, + Path outputDir, Path inDir, + String mapSignalFile, String reduceSignalFile) + throws IOException { + JobConf[] jobs = new JobConf[priorities.length]; + for (int i = 0; i < jobs.length; ++i) { + jobs[i] = new JobConf(conf); + Path newOutputDir = outputDir.suffix(String.valueOf(numJobsSubmitted++)); + configureWaitingJobConf(jobs[i], inDir, newOutputDir, + numMaps[i], numReds[i], "jt-restart-test-job", + mapSignalFile, reduceSignalFile); + jobs[i].setJobPriority(priorities[i]); + } + return jobs; + } + + /** + * A utility that waits for specified amount of time + */ + static void waitFor(long duration) { + try { + synchronized (waitLock) { + waitLock.wait(duration); + } + } catch (InterruptedException ie) {} + } + + /** + * Wait for the jobtracker to be RUNNING. + */ + static void waitForJobTracker(JobClient jobClient) { + while (true) { + try { + ClusterStatus status = jobClient.getClusterStatus(); + while (status.getJobTrackerState() != JobTracker.State.RUNNING) { + waitFor(100); + status = jobClient.getClusterStatus(); + } + break; // means that the jt is ready + } catch (IOException ioe) {} + } + } + + /** + * Signal the maps/reduces to start. + */ + static void signalTasks(MiniDFSCluster dfs, FileSystem fileSys, + boolean isMap, String mapSignalFile, + String reduceSignalFile) + throws IOException { + // signal the maps to complete + TestRackAwareTaskPlacement.writeFile(dfs.getNameNode(), fileSys.getConf(), + isMap + ? new Path(mapSignalFile) + : new Path(reduceSignalFile), + (short)1); + } + + /** + * Waits until all the jobs at the jobtracker complete. + */ + static void waitTillDone(JobClient jobClient) throws IOException { + // Wait for the last job to complete + while (true) { + boolean shouldWait = false; + for (JobStatus jobStatuses : jobClient.getAllJobs()) { + if (jobStatuses.getRunState() == JobStatus.RUNNING) { + shouldWait = true; + break; + } + } + if (shouldWait) { + waitFor(1000); + } else { + break; + } + } + } + + /** + * Clean up the signals. + */ + static void cleanUp(FileSystem fileSys, Path dir) throws IOException { + // Delete the map signal file + fileSys.delete(new Path(getMapSignalFile(dir)), false); + // Delete the reduce signal file + fileSys.delete(new Path(getReduceSignalFile(dir)), false); + } + + /** + * Tests multiple jobs on jobtracker with restart-recovery turned on. + * Preparation : + * - Configure 3 jobs as follows [format {prio, maps, reducers}] + * - job1 : {normal, 50, 1} + * - job2 : {low, 1, 1} + * - job3 : {high, 1, 1} + * - Configure the cluster to run 1 reducer + * - Lower the history file block size and buffer + * + * Submit these 3 jobs but make sure that job1's priority is changed and job1 + * is RUNNING before submitting other jobs + * The order in which the jobs will be executed will be job1, job3 and job2. + * + * Above ordering makes sure that job1 runs before everyone else. + * Wait for job1 to complete 50%. Note that all the jobs are configured to + * use [EMAIL PROTECTED] HalfWaitingMapper} and [EMAIL PROTECTED] WaitingReducer}. So job1 will + * eventually wait on 50% + * + * Make a note of the following things + * - Job start times + * + * Restart the jobtracker + * + * Wait for job1 to finish all the maps and note the TaskCompletion events at + * the tracker. + * + * Wait for all the jobs to finish + * + * Also make sure that the order in which the jobs were sorted before restart + * remains same. For this check the follwoing + * job1.start-time < job2.start-time < job3.start-time and + * job1.finish-time < job3.finish-time < job2.finish-time + * This ordering makes sure that the change of priority is logged and + * recovered back + */ + public void testRecoveryWithMultipleJobs(MiniDFSCluster dfs, + MiniMRCluster mr) + throws IOException { + FileSystem fileSys = dfs.getFileSystem(); + JobConf jobConf = mr.createJobConf(); + JobPriority[] priorities = {JobPriority.NORMAL, JobPriority.LOW, + JobPriority.HIGH}; + // Note that there is only 1 tracker + int[] numMaps = {50, 1, 1}; + int[] numReds = {1, 1, 1}; + + cleanUp(fileSys, shareDir); + + // Configure the jobs + JobConf[] jobs = getJobs(jobConf, priorities, numMaps, numReds, + outputDir, inDir, + getMapSignalFile(shareDir), + getReduceSignalFile(shareDir)); + + // Master job parameters + int masterJob = 0; + JobPriority masterJobNewPriority = JobPriority.HIGH; + + // Submit a master job + JobClient jobClient = new JobClient(jobs[masterJob]); + RunningJob job = jobClient.submitJob(jobs[masterJob]); + JobID id = job.getID(); + + // Wait for the job to be inited + mr.initializeJob(id); + + // Change the master job's priority so that priority logging is tested + mr.setJobPriority(id, masterJobNewPriority); + + // Submit the remaining jobs and find the last job id + for (int i = 1; i < jobs.length; ++i) { + RunningJob rJob = (new JobClient(jobs[i])).submitJob(jobs[i]); + mr.initializeJob(rJob.getID()); + } + + // Make sure that the master job is 50% completed + while (getJobStatus(jobClient, id).mapProgress() < 0.5f) { + waitFor(100); + } + + // Note the data that needs to be tested upon restart + long jobStartTime = getJobStatus(jobClient, id).getStartTime(); + + // Kill the jobtracker + mr.stopJobTracker(); + + // Signal the maps to complete + signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir), + getReduceSignalFile(shareDir)); + + // Signal the reducers to complete + signalTasks(dfs, fileSys, false, getMapSignalFile(shareDir), + getReduceSignalFile(shareDir)); + + // Enable recovery on restart + mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", + true); + + // Wait for a minute before submitting a job + waitFor(60 * 1000); + + // Restart the jobtracker + mr.startJobTracker(); + + // Check if the jobs are still running + + // Wait for the JT to be ready + waitForJobTracker(jobClient); + + // Check if the job recovered + assertEquals("Restart failed as previously submitted job was missing", + true, getJobStatus(jobClient, id) != null); + + // check if the job's priority got changed + assertEquals("Restart failed as job's priority did not match", + true, mr.getJobPriority(id).equals(masterJobNewPriority)); + + + + waitTillDone(jobClient); + + // Check if the jobs are in order .. the order is 1->3->2 + JobStatus[] newStatuses = jobClient.getAllJobs(); + // Check if the jobs are in the order of submission + // This is important for the following checks + boolean jobOrder = newStatuses[0].getJobID().getId() == 1 + && newStatuses[1].getJobID().getId() == 2 + && newStatuses[2].getJobID().getId() == 3; + assertTrue("Job submission order changed", jobOrder); + + // Start times are in order and non zero + boolean startTimeOrder = newStatuses[0].getStartTime() > 0 + && newStatuses[0].getStartTime() + < newStatuses[1].getStartTime() + && newStatuses[1].getStartTime() + < newStatuses[2].getStartTime(); + assertTrue("Job start-times are out of order", startTimeOrder); + + boolean finishTimeOrder = + mr.getJobFinishTime(newStatuses[0].getJobID()) > 0 + && mr.getJobFinishTime(newStatuses[0].getJobID()) + < mr.getJobFinishTime(newStatuses[2].getJobID()) + && mr.getJobFinishTime(newStatuses[2].getJobID()) + < mr.getJobFinishTime(newStatuses[1].getJobID()); + assertTrue("Jobs finish-times are out of order", finishTimeOrder); + + + // This should be used for testing job counters + job.getCounters(); + + // check if the job was successful + assertTrue("Previously submitted job was not successful", + job.isSuccessful()); + + // Check if the start time was recovered + assertTrue("Previously submitted job's start time has changed", + getJobStatus(jobClient, id).getStartTime() == jobStartTime); + + // Test history files + testJobHistoryFiles(id, jobs[masterJob]); + } + + /** + * Tests the jobtracker with restart-recovery turned off. + * Submit a job with normal priority, maps = 2, reducers = 0} + * + * Wait for the job to complete 50% + * + * Restart the jobtracker with recovery turned off + * + * Check if the job is missing + */ + public void testRestartWithoutRecovery(MiniDFSCluster dfs, + MiniMRCluster mr) + throws IOException { + // III. Test a job with waiting mapper and recovery turned off + + FileSystem fileSys = dfs.getFileSystem(); + + cleanUp(fileSys, shareDir); + + JobConf newConf = getJobs(mr.createJobConf(), + new JobPriority[] {JobPriority.NORMAL}, + new int[] {2}, new int[] {0}, + outputDir, inDir, + getMapSignalFile(shareDir), + getReduceSignalFile(shareDir))[0]; + + JobClient jobClient = new JobClient(newConf); + RunningJob job = jobClient.submitJob(newConf); + JobID id = job.getID(); + + // make sure that the job is 50% completed + while (getJobStatus(jobClient, id).mapProgress() < 0.5f) { + waitFor(100); + } + + mr.stopJobTracker(); + + // Turn off the recovery + mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", + false); + + // Wait for a minute before submitting a job + waitFor(60 * 1000); + + mr.startJobTracker(); + + // Signal the tasks + signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir), + getReduceSignalFile(shareDir)); + + // Wait for the JT to be ready + waitForJobTracker(jobClient); + + waitTillDone(jobClient); + + // The submitted job should not exist + assertTrue("Submitted job was detected with recovery disabled", + getJobStatus(jobClient, id) == null); + } + + /** Tests a job on jobtracker with restart-recovery turned on. + * Preparation : + * - Configure a job with + * - num-maps : 50 + * - num-reducers : 1 + * - Configure the cluster to run 1 reducer + * - Lower the history file block size and buffer + * + * Wait for the job to complete 50%. Note that all the job is configured to + * use [EMAIL PROTECTED] HalfWaitingMapper} and [EMAIL PROTECTED] WaitingReducer}. So job will + * eventually wait on 50% + * + * Make a note of the following things + * - Task completion events + * - Cluster status + * - Task Reports + * - Job start time + * + * Restart the jobtracker + * + * Wait for job to finish all the maps and note the TaskCompletion events at + * the tracker. + * + * Wait for all the jobs to finish and note the following + * - New task completion events at the jobtracker + * - Task reports + * - Cluster status + * + * Check for the following + * - Task completion events for recovered tasks should match + * - Task completion events at the tasktracker and the restarted + * jobtracker should be same + * - Cluster status should be fine. + * - Task Reports for recovered tasks should match + * Checks + * - start time + * - finish time + * - counters + * - http-location + * - task-id + * - Job start time should match + * - Check if the counters can be accessed + * - Check if the history files are (re)named properly + */ + public void testTaskEventsAndReportsWithRecovery(MiniDFSCluster dfs, + MiniMRCluster mr) + throws IOException { + // II. Test a tasktracker with waiting mapper and recovery turned on. + // Ideally the tracker should SYNC with the new/restarted jobtracker + + FileSystem fileSys = dfs.getFileSystem(); + final int numMaps = 50; + final int numReducers = 1; + + + cleanUp(fileSys, shareDir); + + JobConf newConf = getJobs(mr.createJobConf(), + new JobPriority[] {JobPriority.NORMAL}, + new int[] {numMaps}, new int[] {numReducers}, + outputDir, inDir, + getMapSignalFile(shareDir), + getReduceSignalFile(shareDir))[0]; + + JobClient jobClient = new JobClient(newConf); + RunningJob job = jobClient.submitJob(newConf); + JobID id = job.getID(); + + mr.initializeJob(id); + + // make sure that atleast on reducer is spawned + while (jobClient.getClusterStatus().getReduceTasks() == 0) { + waitFor(100); + } + + while(true) { + // Since we are using a half waiting mapper, maps should be stuck at 50% + TaskCompletionEvent[] trackerEvents = + mr.getMapTaskCompletionEventsUpdates(0, id, numMaps) + .getMapTaskCompletionEvents(); + if (trackerEvents.length < numMaps / 2) { + waitFor(1000); + } else { + break; + } + } + + TaskCompletionEvent[] prevEvents = + mr.getMapTaskCompletionEvents(id, 0, numMaps); + TaskReport[] prevReports = jobClient.getMapTaskReports(id); + ClusterStatus prevStatus = jobClient.getClusterStatus(); + + mr.stopJobTracker(); + + // Turn off the recovery + mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", + true); + + // Wait for a minute before submitting a job + waitFor(60 * 1000); + + mr.startJobTracker(); + + // Signal the map tasks + signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir), + getReduceSignalFile(shareDir)); + + // Wait for the JT to be ready + waitForJobTracker(jobClient); + + int numToMatch = mr.getNumEventsRecovered() / 2; + + // make sure that the maps are completed + while (getJobStatus(jobClient, id).mapProgress() < 1.0f) { + waitFor(100); + } + + // Get the new jobtrackers events + TaskCompletionEvent[] jtEvents = + mr.getMapTaskCompletionEvents(id, 0, 2 * numMaps); + + // Test if all the events that were recovered match exactly + testTaskCompletionEvents(prevEvents, jtEvents, false, numToMatch); + + TaskCompletionEvent[] trackerEvents; + while(true) { + trackerEvents = + mr.getMapTaskCompletionEventsUpdates(0, id, 2 * numMaps) + .getMapTaskCompletionEvents(); + if (trackerEvents.length < jtEvents.length) { + waitFor(1000); + } else { + break; + } + } + + // Check the task reports + // The reports should match exactly if the attempts are same + TaskReport[] afterReports = jobClient.getMapTaskReports(id); + testTaskReports(prevReports, afterReports, numToMatch); + + // Signal the reduce tasks + signalTasks(dfs, fileSys, false, getMapSignalFile(shareDir), + getReduceSignalFile(shareDir)); + + waitTillDone(jobClient); + + testTaskCompletionEvents(jtEvents, trackerEvents, true, 2 * numMaps); + + // check if the cluster status is insane + ClusterStatus status = jobClient.getClusterStatus(); + assertTrue("Cluster status is insane", + checkClusterStatusOnCompletion(status, prevStatus)); + } + + /** + * Checks if the history files are as expected + * @param id job id + * @param conf job conf + */ + private void testJobHistoryFiles(JobID id, JobConf conf) + throws IOException { + // Get the history files for users + String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id); + String tempLogFileName = + JobHistory.JobInfo.getSecondaryJobHistoryFile(logFileName); + + // I. User files + Path logFile = + JobHistory.JobInfo.getJobHistoryLogLocationForUser(logFileName, conf); + FileSystem fileSys = logFile.getFileSystem(conf); + + // Check if the history file exists + assertTrue("User log file does not exist", fileSys.exists(logFile)); + + // Check if the temporary file is deleted + Path tempLogFile = + JobHistory.JobInfo.getJobHistoryLogLocationForUser(tempLogFileName, + conf); + assertFalse("User temporary log file exists", fileSys.exists(tempLogFile)); + + // II. Framework files + // Get the history file + logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName); + fileSys = logFile.getFileSystem(conf); + + // Check if the history file exists + assertTrue("Log file does not exist", fileSys.exists(logFile)); + + // Check if the temporary file is deleted + tempLogFile = JobHistory.JobInfo.getJobHistoryLogLocation(tempLogFileName); + assertFalse("Temporary log file exists", fileSys.exists(tempLogFile)); + } + + /** + * Matches specified number of task reports. + * @param source the reports to be matched + * @param target reports to match with + * @param numToMatch num reports to match + * @param mismatchSet reports that should not match + */ + private void testTaskReports(TaskReport[] source, TaskReport[] target, + int numToMatch) { + for (int i = 0; i < numToMatch; ++i) { + // Check if the task reports was recovered correctly + assertTrue("Task reports for same attempt has changed", + source[i].equals(target[i])); + } + } + + /** + * Matches the task completion events. + * @param source the events to be matched + * @param target events to match with + * @param fullMatch whether to match the events completely or partially + * @param numToMatch number of events to match in case full match is not + * desired + * @param ignoreSet a set of taskids to ignore + */ + private void testTaskCompletionEvents(TaskCompletionEvent[] source, + TaskCompletionEvent[] target, + boolean fullMatch, + int numToMatch) { + // Check if the event list size matches + // The lengths should match only incase of full match + if (fullMatch) { + assertEquals("Map task completion events mismatch", + source.length, target.length); + numToMatch = source.length; + } + // Check if the events match + for (int i = 0; i < numToMatch; ++i) { + if (source[i].getTaskAttemptId().equals(target[i].getTaskAttemptId())){ + assertTrue("Map task completion events ordering mismatch", + source[i].equals(target[i])); + } + } + } + + private boolean checkClusterStatusOnCompletion(ClusterStatus status, + ClusterStatus prevStatus) { + return status.getJobTrackerState() == prevStatus.getJobTrackerState() + && status.getMapTasks() == 0 + && status.getReduceTasks() == 0; + } + + public void testJobTrackerRestart() throws IOException { + String namenode = null; + MiniDFSCluster dfs = null; + MiniMRCluster mr = null; + FileSystem fileSys = null; + + try { + Configuration conf = new Configuration(); + conf.setBoolean("dfs.replication.considerLoad", false); + dfs = new MiniDFSCluster(conf, 1, true, null, null); + dfs.waitActive(); + fileSys = dfs.getFileSystem(); + + // clean up + fileSys.delete(testDir, true); + + if (!fileSys.mkdirs(inDir)) { + throw new IOException("Mkdirs failed to create " + inDir.toString()); + } + + // Write the input file + TestRackAwareTaskPlacement.writeFile(dfs.getNameNode(), conf, + new Path(inDir + "/file"), + (short)1); + + dfs.startDataNodes(conf, 1, true, null, null, null, null); + dfs.waitActive(); + + namenode = (dfs.getFileSystem()).getUri().getHost() + ":" + + (dfs.getFileSystem()).getUri().getPort(); + + // Make sure that jobhistory leads to a proper job restart + // So keep the blocksize and the buffer size small + JobConf jtConf = new JobConf(); + jtConf.set("mapred.jobtracker.job.history.block.size", "1024"); + jtConf.set("mapred.jobtracker.job.history.buffer.size", "1024"); + jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1); + jtConf.setLong("mapred.tasktracker.expiry.interval", 25 * 1000); + + mr = new MiniMRCluster(1, namenode, 1, null, null, jtConf); + + // Test multiple jobs on jobtracker with restart-recovery turned on + testRecoveryWithMultipleJobs(dfs, mr); + + // Test the tasktracker SYNC + testTaskEventsAndReportsWithRecovery(dfs, mr); + + // Test jobtracker with restart-recovery turned off + testRestartWithoutRecovery(dfs, mr); + } finally { + if (mr != null) { + try { + mr.shutdown(); + } catch (Exception e) {} + } + if (dfs != null) { + try { + dfs.shutdown(); + } catch (Exception e) {} + } + } + } + + static String getMapSignalFile(Path dir) { + return dir.suffix("/jt-restart-map-signal").toString(); + } + + static String getReduceSignalFile(Path dir) { + return dir.suffix("/jt-restart-reduce-signal").toString(); + } + + /** + * Map is a Mapper that just waits for a file to be created on the dfs. The + * file creation is a signal to the mappers and hence acts as a waiting job. + * Only the later half of the maps wait for the signal while the rest + * complete immediately. + */ + + static class HalfWaitingMapper + extends MapReduceBase + implements Mapper<WritableComparable, Writable, + WritableComparable, Writable> { + + FileSystem fs = null; + Path signal; + int id = 0; + int totalMaps = 0; + + /** The waiting function. The map exits once it gets a signal. Here the + * signal is the file existence. + */ + public void map(WritableComparable key, Writable val, + OutputCollector<WritableComparable, Writable> output, + Reporter reporter) + throws IOException { + if (id > totalMaps / 2) { + if (fs != null) { + while (!fs.exists(signal)) { + try { + reporter.progress(); + synchronized (this) { + this.wait(1000); // wait for 1 sec + } + } catch (InterruptedException ie) { + System.out.println("Interrupted while the map was waiting for " + + " the signal."); + break; + } + } + } else { + throw new IOException("Could not get the DFS!!"); + } + } + } + + public void configure(JobConf conf) { + try { + String taskId = conf.get("mapred.task.id"); + id = Integer.parseInt(taskId.split("_")[4]); + totalMaps = Integer.parseInt(conf.get("mapred.map.tasks")); + fs = FileSystem.get(conf); + signal = new Path(conf.get("test.mapred.map.waiting.target")); + } catch (IOException ioe) { + System.out.println("Got an exception while obtaining the filesystem"); + } + } + } + + /** + * Reduce that just waits for a file to be created on the dfs. The + * file creation is a signal to the reduce. + */ + + static class WaitingReducer extends MapReduceBase + implements Reducer<WritableComparable, Writable, + WritableComparable, Writable> { + + FileSystem fs = null; + Path signal; + + /** The waiting function. The reduce exits once it gets a signal. Here the + * signal is the file existence. + */ + public void reduce(WritableComparable key, Iterator<Writable> val, + OutputCollector<WritableComparable, Writable> output, + Reporter reporter) + throws IOException { + if (fs != null) { + while (!fs.exists(signal)) { + try { + reporter.progress(); + synchronized (this) { + this.wait(1000); // wait for 1 sec + } + } catch (InterruptedException ie) { + System.out.println("Interrupted while the map was waiting for the" + + " signal."); + break; + } + } + } else { + throw new IOException("Could not get the DFS!!"); + } + } + + public void configure(JobConf conf) { + try { + fs = FileSystem.get(conf); + signal = new Path(conf.get("test.mapred.reduce.waiting.target")); + } catch (IOException ioe) { + System.out.println("Got an exception while obtaining the filesystem"); + } + } + } + + static void configureWaitingJobConf(JobConf jobConf, Path inDir, + Path outputPath, int numMaps, int numRed, + String jobName, String mapSignalFilename, + String redSignalFilename) + throws IOException { + jobConf.setJobName(jobName); + jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class); + jobConf.setOutputFormat(SequenceFileOutputFormat.class); + FileInputFormat.setInputPaths(jobConf, inDir); + FileOutputFormat.setOutputPath(jobConf, outputPath); + jobConf.setMapperClass(HalfWaitingMapper.class); + jobConf.setReducerClass(IdentityReducer.class); + jobConf.setOutputKeyClass(BytesWritable.class); + jobConf.setOutputValueClass(BytesWritable.class); + jobConf.setInputFormat(RandomInputFormat.class); + jobConf.setNumMapTasks(numMaps); + jobConf.setNumReduceTasks(numRed); + jobConf.setJar("build/test/testjar/testjob.jar"); + jobConf.set("test.mapred.map.waiting.target", mapSignalFilename); + jobConf.set("test.mapred.reduce.waiting.target", redSignalFilename); + } + + public static void main(String[] args) throws IOException { + new TestJobTrackerRestart().testJobTrackerRestart(); + } +} \ No newline at end of file
Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java?rev=695823&view=auto ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java (added) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java Tue Sep 16 05:05:18 2008 @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapred.TestJobTrackerRestart; + +import junit.framework.TestCase; +import java.io.*; + +/** + * This test checks if the jobtracker can detect and recover a tracker that was + * lost while the jobtracker was down. + */ +public class TestJobTrackerRestartWithLostTracker extends TestCase { + final Path testDir = new Path("/jt-restart-lost-tt-testing"); + final Path inDir = new Path(testDir, "input"); + final Path shareDir = new Path(testDir, "share"); + final Path outputDir = new Path(testDir, "output"); + + private JobConf configureJob(JobConf conf, int[] maps, int[] reduces, + String mapSignal, String redSignal) + throws IOException { + JobPriority[] priority = new JobPriority[] {JobPriority.NORMAL}; + return TestJobTrackerRestart.getJobs(conf, priority, + maps, reduces, outputDir, inDir, + mapSignal, redSignal)[0]; + } + + public void testRecoveryWithLostTracker(MiniDFSCluster dfs, + MiniMRCluster mr) + throws IOException { + FileSystem fileSys = dfs.getFileSystem(); + JobConf jobConf = mr.createJobConf(); + int numMaps = 50; + int numReds = 1; + String mapSignalFile = TestJobTrackerRestart.getMapSignalFile(shareDir); + String redSignalFile = TestJobTrackerRestart.getReduceSignalFile(shareDir); + + // Configure the jobs + JobConf job = configureJob(jobConf, new int[] {numMaps}, + new int[] {numReds}, + mapSignalFile, redSignalFile); + + TestJobTrackerRestart.cleanUp(fileSys, shareDir); + + // Submit a master job + JobClient jobClient = new JobClient(job); + RunningJob rJob = jobClient.submitJob(job); + JobID id = rJob.getID(); + + // wait for the job to be inited + mr.initializeJob(id); + + // Make sure that the master job is 50% completed + while (TestJobTrackerRestart.getJobStatus(jobClient, id).mapProgress() + < 0.5f) { + TestJobTrackerRestart.waitFor(100); + } + + // Kill the jobtracker + mr.stopJobTracker(); + + // Signal the maps to complete + TestJobTrackerRestart.signalTasks(dfs, fileSys, true, + mapSignalFile, redSignalFile); + + // Enable recovery on restart + mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", + true); + + // Kill the 2nd tasktracker + mr.stopTaskTracker(1); + + // Wait for a minute before submitting a job + TestJobTrackerRestart.waitFor(60 * 1000); + + // Restart the jobtracker + mr.startJobTracker(); + + // Check if the jobs are still running + + // Wait for the JT to be ready + TestJobTrackerRestart.waitForJobTracker(jobClient); + + // Signal the reducers to complete + TestJobTrackerRestart.signalTasks(dfs, fileSys, false, + mapSignalFile, redSignalFile); + + TestJobTrackerRestart.waitTillDone(jobClient); + + // Check if the tasks on the lost tracker got re-executed + assertTrue("Tracker killed while the jobtracker was down did not get lost " + + "upon restart", + jobClient.getClusterStatus().getTaskTrackers() + < mr.getNumTaskTrackers()); + } + + public void testRestartWithLostTracker() throws IOException { + String namenode = null; + MiniDFSCluster dfs = null; + MiniMRCluster mr = null; + FileSystem fileSys = null; + + try { + Configuration conf = new Configuration(); + conf.setBoolean("dfs.replication.considerLoad", false); + dfs = new MiniDFSCluster(conf, 1, true, null, null); + dfs.waitActive(); + fileSys = dfs.getFileSystem(); + + // clean up + fileSys.delete(testDir, true); + + if (!fileSys.mkdirs(inDir)) { + throw new IOException("Mkdirs failed to create " + inDir.toString()); + } + + // Write the input file + TestRackAwareTaskPlacement.writeFile(dfs.getNameNode(), conf, + new Path(inDir + "/file"), + (short)1); + + dfs.startDataNodes(conf, 1, true, null, null, null, null); + dfs.waitActive(); + + namenode = (dfs.getFileSystem()).getUri().getHost() + ":" + + (dfs.getFileSystem()).getUri().getPort(); + + // Make sure that jobhistory leads to a proper job restart + // So keep the blocksize and the buffer size small + JobConf jtConf = new JobConf(); + jtConf.set("mapred.jobtracker.job.history.block.size", "1024"); + jtConf.set("mapred.jobtracker.job.history.buffer.size", "1024"); + jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1); + jtConf.setLong("mapred.tasktracker.expiry.interval", 25 * 1000); + + mr = new MiniMRCluster(2, namenode, 1, null, null, jtConf); + + // Test Lost tracker case + testRecoveryWithLostTracker(dfs, mr); + } finally { + if (mr != null) { + try { + mr.shutdown(); + } catch (Exception e) {} + } + if (dfs != null) { + try { + dfs.shutdown(); + } catch (Exception e) {} + } + } + } + + public static void main(String[] args) throws IOException { + new TestJobTrackerRestartWithLostTracker().testRestartWithLostTracker(); + } +} \ No newline at end of file Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java?rev=695823&r1=695822&r2=695823&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java Tue Sep 16 05:05:18 2008 @@ -36,16 +36,33 @@ public class TestMRServerPorts extends TestCase { TestHDFSServerPorts hdfs = new TestHDFSServerPorts(); + // Runs the JT in a separate thread + private static class JTRunner extends Thread { + JobTracker jt; + void setJobTracker(JobTracker jt) { + this.jt = jt; + } + + public void run() { + if (jt != null) { + try { + jt.offerService(); + } catch (Exception ioe) {} + } + } + } /** * Check whether the JobTracker can be started. */ - private JobTracker startJobTracker(JobConf conf) + private JobTracker startJobTracker(JobConf conf, JTRunner runner) throws IOException { conf.set("mapred.job.tracker", "localhost:0"); conf.set("mapred.job.tracker.http.address", "0.0.0.0:0"); JobTracker jt = null; try { jt = JobTracker.startTracker(conf); + runner.setJobTracker(jt); + runner.start(); conf.set("mapred.job.tracker", "localhost:" + jt.getTrackerPort()); conf.set("mapred.job.tracker.http.address", "0.0.0.0:" + jt.getInfoPort()); @@ -132,11 +149,13 @@ public void testTaskTrackerPorts() throws Exception { NameNode nn = null; JobTracker jt = null; + JTRunner runner = null; try { nn = hdfs.startNameNode(); JobConf conf2 = new JobConf(hdfs.getConfig()); - jt = startJobTracker(conf2); + runner = new JTRunner(); + jt = startJobTracker(conf2, runner); // start job tracker on the same port as name-node conf2.set("mapred.task.tracker.report.address", @@ -165,6 +184,8 @@ if (jt != null) { jt.fs.close(); jt.stopTracker(); + runner.interrupt(); + runner.join(); } hdfs.stopNameNode(nn); } Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java?rev=695823&r1=695822&r2=695823&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java Tue Sep 16 05:05:18 2008 @@ -163,7 +163,7 @@ JobID jobId = job.getID(); // construct the task id of first map task of failmap - TaskAttemptID taskId = new TaskAttemptID(new TaskID(jobId,true, 0), 0); + TaskAttemptID taskId = mr.getTaskAttemptId(new TaskID(jobId,true, 0), 0); // wait for the job to finish. while (!job.isComplete()) ; Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=695823&r1=695822&r2=695823&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Tue Sep 16 05:05:18 2008 @@ -181,7 +181,9 @@ LOG.info("runWordCount"); // Run a word count example // Keeping tasks that match this pattern - jobConf.setKeepTaskFilesPattern(TaskAttemptID.getTaskAttemptIDsPattern(null, null, true, 1, null)); + String pattern = + TaskAttemptID.getTaskAttemptIDsPattern(null, null, true, 1, null, null); + jobConf.setKeepTaskFilesPattern(pattern); TestResult result; final Path inDir = new Path("./wc/input"); final Path outDir = new Path("./wc/output"); @@ -190,8 +192,9 @@ assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" + "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result.output); JobID jobid = result.job.getID(); - TaskAttemptID taskid = new TaskAttemptID(new TaskID(jobid, true, 1),0); - checkTaskDirectories(mr, new String[]{jobid.toString()}, new String[]{taskid.toString()}); + String taskId = mr.getTaskAttemptId(new TaskID(jobid, true, 1), 0).toString(); + checkTaskDirectories(mr, new String[]{jobid.toString()}, + new String[]{taskId}); // test with maps=0 jobConf = mr.createJobConf(); input = "owen is oom"; Modified: hadoop/core/trunk/src/webapps/job/jobhistory.jsp URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/job/jobhistory.jsp?rev=695823&r1=695822&r2=695823&view=diff ============================================================================== --- hadoop/core/trunk/src/webapps/job/jobhistory.jsp (original) +++ hadoop/core/trunk/src/webapps/job/jobhistory.jsp Tue Sep 16 05:05:18 2008 @@ -81,6 +81,8 @@ "<td>Job tracker Start time</td>" + "<td>Job Id</td><td>Name</td><td>User</td>") ; out.print("</tr>"); + + Set<String> displayedJobs = new HashSet<String>(); for (Path jobFile: jobFiles) { String decodedJobFileName = JobHistory.JobInfo.decodeJobHistoryFileName(jobFile.getName()); @@ -92,6 +94,14 @@ String user = jobDetails[5]; String jobName = jobDetails[6]; + // Check if the job is already displayed. There can be multiple job + // history files for jobs that have restarted + if (displayedJobs.contains(jobId)) { + continue; + } else { + displayedJobs.add(jobId); + } + // Encode the logfile name again to cancel the decoding done by the browser String encodedJobFileName = JobHistory.JobInfo.encodeJobHistoryFileName(jobFile.getName()); Modified: hadoop/core/trunk/src/webapps/job/jobtracker.jsp URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/job/jobtracker.jsp?rev=695823&r1=695822&r2=695823&view=diff ============================================================================== --- hadoop/core/trunk/src/webapps/job/jobtracker.jsp (original) +++ hadoop/core/trunk/src/webapps/job/jobtracker.jsp Tue Sep 16 05:05:18 2008 @@ -152,6 +152,18 @@ "</td><td>" + status.getMaxReduceTasks() + "</td><td>" + tasksPerNode + "</td></tr></table>\n"); + + out.print("<br>"); + if (tracker.hasRestarted()) { + out.print("<span class=\"small\"><i>"); + if (tracker.hasRecovered()) { + out.print("The JobTracker got restarted and recovered back in " ); + out.print(StringUtils.formatTime(tracker.getRecoveryDuration())); + } else { + out.print("The JobTracker got restarted and is still recovering"); + } + out.print("</i></span>"); + } }%> <[EMAIL PROTECTED] import="org.apache.hadoop.hdfs.server.namenode.JspHelper"%>
