Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobHistoryVersion.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobHistoryVersion.java?rev=747297&r1=747296&r2=747297&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobHistoryVersion.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobHistoryVersion.java Tue Feb 24 08:01:02 2009 @@ -27,6 +27,10 @@ import org.apache.hadoop.mapred.JobHistory.JobInfo; import org.apache.hadoop.mapred.JobHistory.RecordTypes; +/** + * Tests the JobHistory parser with different versions of job history files. + * This may have to change when new versions of job history files come up. + */ public class TestJobHistoryVersion extends TestCase { private static final String HOSTNAME = "localhost"; private static final String TIME= "1234567890123"; @@ -48,13 +52,16 @@ "test-history-version"); private static final String DELIM = "."; - - private void writeHistoryFile(FSDataOutputStream out, boolean old) + /** + * Creates a job history file of a given specific version. This method should + * change if format/content of future versions of job history file changes. + */ + private void writeHistoryFile(FSDataOutputStream out, long version) throws IOException { - String delim = "\n"; + String delim = "\n"; // '\n' for version 0 String counters = COUNTERS; String jobConf = "job.xml"; - if (!old) { + if (version > 0) { // line delimeter should be '.' for later versions // Change the delimiter delim = DELIM + delim; @@ -111,58 +118,35 @@ } /** - * Tests the JobHistory parser with old files + * Tests the JobHistory parser with different versions of job history files */ - public void testJobHistoryWithoutVersion() throws IOException { - JobConf conf = new JobConf(); - FileSystem fs = FileSystem.getLocal(conf); - - // cleanup - fs.delete(TEST_DIR, true); + public void testJobHistoryVersion() throws IOException { + // If new job history version comes up, the modified parser may fail for + // the history file created by writeHistoryFile(). + for (long version = 0; version <= JobHistory.VERSION; version++) { + JobConf conf = new JobConf(); + FileSystem fs = FileSystem.getLocal(conf); - Path historyPath = new Path(TEST_DIR + "/_logs/history/" + FILENAME); + // cleanup + fs.delete(TEST_DIR, true); - fs.delete(historyPath, false); + Path historyPath = new Path(TEST_DIR + "/_logs/history/" + + FILENAME + version); - FSDataOutputStream out = fs.create(historyPath); - writeHistoryFile(out, true); - out.close(); + fs.delete(historyPath, false); - JobInfo job = new JobHistory.JobInfo(JOB); - DefaultJobHistoryParser.parseJobTasks(historyPath.toString(), job, fs); + FSDataOutputStream out = fs.create(historyPath); + writeHistoryFile(out, version); + out.close(); - assertTrue("Failed to parse old jobhistory files", - job.getAllTasks().size() > 0); + JobInfo job = new JobHistory.JobInfo(JOB); + DefaultJobHistoryParser.parseJobTasks(historyPath.toString(), job, fs); - // cleanup - fs.delete(TEST_DIR, true); - } - - /** - * Tests the JobHistory parser with new file - */ - public void testJobHistoryWithVersion() throws IOException { - JobConf conf = new JobConf(); - FileSystem fs = FileSystem.getLocal(conf); + assertTrue("Failed to parse jobhistory files of version " + version, + job.getAllTasks().size() > 0); - // cleanup - fs.delete(TEST_DIR, true); - - Path historyPath = new Path(TEST_DIR + "/_logs/history/" + FILENAME); - - fs.delete(historyPath, false); - - FSDataOutputStream out = fs.create(historyPath); - writeHistoryFile(out, false); - out.close(); - - JobInfo job = new JobHistory.JobInfo(JOB); - DefaultJobHistoryParser.parseJobTasks(historyPath.toString(), job, fs); - - assertTrue("Failed to parse old jobhistory files", - job.getAllTasks().size() > 0); - - // cleanup - fs.delete(TEST_DIR, true); + // cleanup + fs.delete(TEST_DIR, true); + } } }
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java?rev=747297&r1=747296&r2=747297&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java Tue Feb 24 08:01:02 2009 @@ -19,6 +19,7 @@ package org.apache.hadoop.mapred; import java.util.ArrayList; +import java.io.File; import java.io.IOException; import java.util.List; @@ -39,6 +40,9 @@ LogFactory.getLog(TestJobInProgressListener.class); private final Path testDir = new Path("test-jip-listener-update"); + private static String TEST_ROOT_DIR = new File(System.getProperty( + "test.build.data", "/tmp")).toURI().toString().replace(' ', '+'); + private JobConf configureJob(JobConf conf, int m, int r, Path inDir, Path outputDir, String mapSignalFile, String redSignalFile) @@ -267,9 +271,13 @@ mr.getJobTrackerRunner().getJobTracker() .addJobInProgressListener(myListener); - - // submit and kill the job - JobID id = TestJobKillAndFail.runJobFail(job); + + Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerfailjob/input"); + Path outDir = new Path(TEST_ROOT_DIR + "/jiplistenerfailjob/output"); + + // submit a job that fails + RunningJob rJob = UtilsForTests.runJobFail(job, inDir, outDir); + JobID id = rJob.getID(); // check if the job failure was notified assertFalse("Missing event notification on failing a running job", @@ -288,8 +296,12 @@ mr.getJobTrackerRunner().getJobTracker() .addJobInProgressListener(myListener); + Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerkilljob/input"); + Path outDir = new Path(TEST_ROOT_DIR + "/jiplistenerkilljob/output"); + // submit and kill the job - JobID id = TestJobKillAndFail.runJobKill(job); + RunningJob rJob = UtilsForTests.runJobKill(job, inDir, outDir); + JobID id = rJob.getID(); // check if the job failure was notified assertFalse("Missing event notification on killing a running job", @@ -308,8 +320,11 @@ mr.getJobTrackerRunner().getJobTracker() .addJobInProgressListener(myListener); + Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/input"); + Path outDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/output"); + // submit the job - RunningJob rJob = TestJobKillAndFail.runJob(job); + RunningJob rJob = UtilsForTests.runJob(job, inDir, outDir); // wait for the job to be running while (rJob.getJobState() != JobStatus.RUNNING) { @@ -363,7 +378,10 @@ mr.getJobTrackerRunner().getJobTracker() .addJobInProgressListener(myListener); - RunningJob rJob = TestJobKillAndFail.runJob(job); + Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/input"); + Path outDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/output"); + + RunningJob rJob = UtilsForTests.runJob(job, inDir, outDir); JobID id = rJob.getID(); LOG.info("Job : " + id.toString() + " submitted"); Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java?rev=747297&r1=747296&r2=747297&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java Tue Feb 24 08:01:02 2009 @@ -18,18 +18,12 @@ package org.apache.hadoop.mapred; -import java.io.DataOutputStream; import java.io.File; import java.io.IOException; import junit.framework.TestCase; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; /** * A JUnit test to test Kill Job & Fail Job functionality with local file @@ -40,86 +34,6 @@ private static String TEST_ROOT_DIR = new File(System.getProperty( "test.build.data", "/tmp")).toURI().toString().replace(' ', '+'); - static JobID runJobFail(JobConf conf) throws IOException { - - conf.setJobName("testjobfail"); - conf.setMapperClass(FailMapper.class); - - RunningJob job = runJob(conf); - while (!job.isComplete()) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - break; - } - } - // Checking that the Job got failed - assertEquals(job.getJobState(), JobStatus.FAILED); - - return job.getID(); - } - - static JobID runJobKill(JobConf conf) throws IOException { - - conf.setJobName("testjobkill"); - conf.setMapperClass(KillMapper.class); - - RunningJob job = runJob(conf); - while (job.getJobState() != JobStatus.RUNNING) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - break; - } - } - job.killJob(); - while (job.cleanupProgress() == 0.0f) { - try { - Thread.sleep(10); - } catch (InterruptedException ie) { - break; - } - } - // Checking that the Job got killed - assertTrue(job.isComplete()); - assertEquals(job.getJobState(), JobStatus.KILLED); - - return job.getID(); - } - - static RunningJob runJob(JobConf conf) throws IOException { - - final Path inDir = new Path(TEST_ROOT_DIR + "/failkilljob/input"); - final Path outDir = new Path(TEST_ROOT_DIR + "/failkilljob/output"); - - // run the dummy sleep map - FileSystem fs = FileSystem.get(conf); - fs.delete(outDir, true); - if (!fs.exists(inDir)) { - fs.mkdirs(inDir); - } - String input = "The quick brown fox\n" + "has many silly\n" - + "red fox sox\n"; - DataOutputStream file = fs.create(new Path(inDir, "part-0")); - file.writeBytes(input); - file.close(); - - conf.setInputFormat(TextInputFormat.class); - conf.setOutputKeyClass(Text.class); - conf.setOutputValueClass(IntWritable.class); - - FileInputFormat.setInputPaths(conf, inDir); - FileOutputFormat.setOutputPath(conf, outDir); - conf.setNumMapTasks(1); - conf.setNumReduceTasks(0); - - JobClient jobClient = new JobClient(conf); - RunningJob job = jobClient.submitJob(conf); - - return job; - - } - public void testJobFailAndKill() throws IOException { MiniMRCluster mr = null; try { @@ -127,38 +41,21 @@ // run the TCs JobConf conf = mr.createJobConf(); - runJobFail(conf); - runJobKill(conf); + + Path inDir = new Path(TEST_ROOT_DIR + "/failkilljob/input"); + Path outDir = new Path(TEST_ROOT_DIR + "/failkilljob/output"); + RunningJob job = UtilsForTests.runJobFail(conf, inDir, outDir); + // Checking that the Job got failed + assertEquals(job.getJobState(), JobStatus.FAILED); + + job = UtilsForTests.runJobKill(conf, inDir, outDir); + // Checking that the Job got killed + assertTrue(job.isComplete()); + assertEquals(job.getJobState(), JobStatus.KILLED); } finally { if (mr != null) { mr.shutdown(); } } } - - static class FailMapper extends MapReduceBase implements - Mapper<WritableComparable, Writable, WritableComparable, Writable> { - - public void map(WritableComparable key, Writable value, - OutputCollector<WritableComparable, Writable> out, Reporter reporter) - throws IOException { - - throw new RuntimeException("failing map"); - } - } - - static class KillMapper extends MapReduceBase implements - Mapper<WritableComparable, Writable, WritableComparable, Writable> { - - public void map(WritableComparable key, Writable value, - OutputCollector<WritableComparable, Writable> out, Reporter reporter) - throws IOException { - - try { - Thread.sleep(100000); - } catch (InterruptedException e) { - // Do nothing - } - } - } } Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java?rev=747297&r1=747296&r2=747297&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java Tue Feb 24 08:01:02 2009 @@ -286,6 +286,10 @@ testTaskCompletionEvents(jtEvents, trackerEvents, true, 2 * numMaps); + // validate the history file + TestJobHistory.validateJobHistoryFileFormat(id, newConf, "SUCCESS", true); + TestJobHistory.validateJobHistoryFileContent(mr, job, newConf); + // check if the cluster status is insane ClusterStatus status = jobClient.getClusterStatus(); assertTrue("Cluster status is insane", Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java?rev=747297&r1=747296&r2=747297&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java Tue Feb 24 08:01:02 2009 @@ -112,6 +112,10 @@ mr.getJobTrackerRunner().getJobTracker().getJob(id).failedMapTasks; assertTrue("Tasks that were run on the lost tracker were not killed", failedMaps > 0); + + // validate the history file + TestJobHistory.validateJobHistoryFileFormat(id, job, "SUCCESS", true); + TestJobHistory.validateJobHistoryFileContent(mr, rJob, job); } public void testRestartWithLostTracker() throws IOException { Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLostTracker.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLostTracker.java?rev=747297&r1=747296&r2=747297&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLostTracker.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLostTracker.java Tue Feb 24 08:01:02 2009 @@ -100,6 +100,9 @@ testTaskStatuses(taskInProgress.getTaskStatuses()); } + // validate the history file + TestJobHistory.validateJobHistoryFileFormat(id, job, "SUCCESS", true); + TestJobHistory.validateJobHistoryFileContent(mr, rJob, job); } private void testTaskStatuses(TaskStatus[] tasks) { Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/UtilsForTests.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/UtilsForTests.java?rev=747297&r1=747296&r2=747297&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/UtilsForTests.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/UtilsForTests.java Tue Feb 24 08:01:02 2009 @@ -20,6 +20,7 @@ import java.io.File; import java.io.FileInputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.text.DecimalFormat; @@ -37,9 +38,12 @@ import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat; +import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityReducer; /** @@ -530,4 +534,129 @@ return new RandomRecordReader(((FileSplit) split).getPath()); } } + + // Start a job and return its RunningJob object + static RunningJob runJob(JobConf conf, Path inDir, Path outDir) + throws IOException { + + FileSystem fs = FileSystem.get(conf); + fs.delete(outDir, true); + if (!fs.exists(inDir)) { + fs.mkdirs(inDir); + } + String input = "The quick brown fox\n" + "has many silly\n" + + "red fox sox\n"; + DataOutputStream file = fs.create(new Path(inDir, "part-0")); + file.writeBytes(input); + file.close(); + + conf.setInputFormat(TextInputFormat.class); + conf.setOutputKeyClass(LongWritable.class); + conf.setOutputValueClass(Text.class); + + FileInputFormat.setInputPaths(conf, inDir); + FileOutputFormat.setOutputPath(conf, outDir); + conf.setNumMapTasks(1); + conf.setNumReduceTasks(1); + + JobClient jobClient = new JobClient(conf); + RunningJob job = jobClient.submitJob(conf); + + return job; + } + + // Run a job that will be succeeded and wait until it completes + static RunningJob runJobSucceed(JobConf conf, Path inDir, Path outDir) + throws IOException { + conf.setJobName("test-job-succeed"); + conf.setMapperClass(IdentityMapper.class); + conf.setReducerClass(IdentityReducer.class); + + RunningJob job = UtilsForTests.runJob(conf, inDir, outDir); + while (!job.isComplete()) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + break; + } + } + + return job; + } + + // Run a job that will be failed and wait until it completes + static RunningJob runJobFail(JobConf conf, Path inDir, Path outDir) + throws IOException { + conf.setJobName("test-job-fail"); + conf.setMapperClass(FailMapper.class); + conf.setReducerClass(IdentityReducer.class); + + RunningJob job = UtilsForTests.runJob(conf, inDir, outDir); + while (!job.isComplete()) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + break; + } + } + + return job; + } + + // Run a job that will be killed and wait until it completes + static RunningJob runJobKill(JobConf conf, Path inDir, Path outDir) + throws IOException { + + conf.setJobName("test-job-kill"); + conf.setMapperClass(KillMapper.class); + conf.setReducerClass(IdentityReducer.class); + + RunningJob job = UtilsForTests.runJob(conf, inDir, outDir); + while (job.getJobState() != JobStatus.RUNNING) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + break; + } + } + job.killJob(); + while (job.cleanupProgress() == 0.0f) { + try { + Thread.sleep(10); + } catch (InterruptedException ie) { + break; + } + } + + return job; + } + + // Mapper that fails + static class FailMapper extends MapReduceBase implements + Mapper<WritableComparable, Writable, WritableComparable, Writable> { + + public void map(WritableComparable key, Writable value, + OutputCollector<WritableComparable, Writable> out, Reporter reporter) + throws IOException { + + throw new RuntimeException("failing map"); + } + } + + // Mapper that sleeps for a long time. + // Used for running a job that will be killed + static class KillMapper extends MapReduceBase implements + Mapper<WritableComparable, Writable, WritableComparable, Writable> { + + public void map(WritableComparable key, Writable value, + OutputCollector<WritableComparable, Writable> out, Reporter reporter) + throws IOException { + + try { + Thread.sleep(1000000); + } catch (InterruptedException e) { + // Do nothing + } + } + } }
