Modified: 
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobHistoryVersion.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobHistoryVersion.java?rev=747297&r1=747296&r2=747297&view=diff
==============================================================================
--- 
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobHistoryVersion.java 
(original)
+++ 
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobHistoryVersion.java 
Tue Feb 24 08:01:02 2009
@@ -27,6 +27,10 @@
 import org.apache.hadoop.mapred.JobHistory.JobInfo;
 import org.apache.hadoop.mapred.JobHistory.RecordTypes;
 
+/**
+ * Tests the JobHistory parser with different versions of job history files.
+ * This may have to change when new versions of job history files come up.
+ */
 public class TestJobHistoryVersion extends TestCase {
   private static final String HOSTNAME = "localhost";
   private static final String TIME= "1234567890123";
@@ -48,13 +52,16 @@
              "test-history-version");
   private static final String DELIM = ".";
   
-  
-  private void writeHistoryFile(FSDataOutputStream out, boolean old)
+  /**
+   * Creates a job history file of a given specific version. This method should
+   * change if format/content of future versions of job history file changes.
+   */
+  private void writeHistoryFile(FSDataOutputStream out, long version)
   throws IOException {
-    String delim = "\n";
+    String delim = "\n"; // '\n' for version 0
     String counters = COUNTERS;
     String jobConf = "job.xml";
-    if (!old) {
+    if (version > 0) { // line delimeter should be '.' for later versions
       // Change the delimiter
       delim = DELIM + delim;
       
@@ -111,58 +118,35 @@
   }
   
   /**
-   * Tests the JobHistory parser with old files
+   * Tests the JobHistory parser with different versions of job history files
    */
-  public void testJobHistoryWithoutVersion() throws IOException {
-    JobConf conf = new JobConf();
-    FileSystem fs = FileSystem.getLocal(conf);
-    
-    // cleanup
-    fs.delete(TEST_DIR, true);
+  public void testJobHistoryVersion() throws IOException {
+    // If new job history version comes up, the modified parser may fail for
+    // the history file created by writeHistoryFile().
+    for (long version = 0; version <= JobHistory.VERSION; version++) {
+      JobConf conf = new JobConf();
+      FileSystem fs = FileSystem.getLocal(conf);
     
-    Path historyPath = new Path(TEST_DIR + "/_logs/history/" + FILENAME);
+      // cleanup
+      fs.delete(TEST_DIR, true);
     
-    fs.delete(historyPath, false);
+      Path historyPath = new Path(TEST_DIR + "/_logs/history/" +
+                                  FILENAME + version);
     
-    FSDataOutputStream out = fs.create(historyPath);
-    writeHistoryFile(out, true);
-    out.close();
+      fs.delete(historyPath, false);
     
-    JobInfo job = new JobHistory.JobInfo(JOB); 
-    DefaultJobHistoryParser.parseJobTasks(historyPath.toString(), job, fs);
+      FSDataOutputStream out = fs.create(historyPath);
+      writeHistoryFile(out, version);
+      out.close();
     
-    assertTrue("Failed to parse old jobhistory files", 
-               job.getAllTasks().size() > 0);
+      JobInfo job = new JobHistory.JobInfo(JOB); 
+      DefaultJobHistoryParser.parseJobTasks(historyPath.toString(), job, fs);
     
-    // cleanup
-    fs.delete(TEST_DIR, true);
-  }
-  
-  /**
-   * Tests the JobHistory parser with new file
-   */
-  public void testJobHistoryWithVersion() throws IOException {
-    JobConf conf = new JobConf();
-    FileSystem fs = FileSystem.getLocal(conf);
+      assertTrue("Failed to parse jobhistory files of version " + version,
+                 job.getAllTasks().size() > 0);
     
-    // cleanup
-    fs.delete(TEST_DIR, true);
-    
-    Path historyPath = new Path(TEST_DIR + "/_logs/history/" + FILENAME);
-    
-    fs.delete(historyPath, false);
-    
-    FSDataOutputStream out = fs.create(historyPath);
-    writeHistoryFile(out, false);
-    out.close();
-    
-    JobInfo job = new JobHistory.JobInfo(JOB); 
-    DefaultJobHistoryParser.parseJobTasks(historyPath.toString(), job, fs);
-    
-    assertTrue("Failed to parse old jobhistory files", 
-               job.getAllTasks().size() > 0);
-    
-    // cleanup
-    fs.delete(TEST_DIR, true);
+      // cleanup
+      fs.delete(TEST_DIR, true);
+    }
   }
 }

Modified: 
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java?rev=747297&r1=747296&r2=747297&view=diff
==============================================================================
--- 
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java
 (original)
+++ 
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java
 Tue Feb 24 08:01:02 2009
@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapred;
 
 import java.util.ArrayList;
+import java.io.File;
 import java.io.IOException;
 import java.util.List;
 
@@ -39,6 +40,9 @@
     LogFactory.getLog(TestJobInProgressListener.class);
   private final Path testDir = new Path("test-jip-listener-update");
   
+  private static String TEST_ROOT_DIR = new File(System.getProperty(
+          "test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
+
   private JobConf configureJob(JobConf conf, int m, int r, 
                                Path inDir, Path outputDir,
                                String mapSignalFile, String redSignalFile) 
@@ -267,9 +271,13 @@
     
     mr.getJobTrackerRunner().getJobTracker()
       .addJobInProgressListener(myListener);
-    
-    // submit and kill the job   
-    JobID id = TestJobKillAndFail.runJobFail(job);
+
+    Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerfailjob/input");
+    Path outDir = new Path(TEST_ROOT_DIR + "/jiplistenerfailjob/output");
+
+    // submit a job that fails 
+    RunningJob rJob = UtilsForTests.runJobFail(job, inDir, outDir);
+    JobID id = rJob.getID();
 
     // check if the job failure was notified
     assertFalse("Missing event notification on failing a running job", 
@@ -288,8 +296,12 @@
     mr.getJobTrackerRunner().getJobTracker()
       .addJobInProgressListener(myListener);
     
+    Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerkilljob/input");
+    Path outDir = new Path(TEST_ROOT_DIR + "/jiplistenerkilljob/output");
+
     // submit and kill the job   
-    JobID id = TestJobKillAndFail.runJobKill(job);
+    RunningJob rJob = UtilsForTests.runJobKill(job, inDir, outDir);
+    JobID id = rJob.getID();
 
     // check if the job failure was notified
     assertFalse("Missing event notification on killing a running job", 
@@ -308,8 +320,11 @@
     mr.getJobTrackerRunner().getJobTracker()
       .addJobInProgressListener(myListener);
     
+    Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/input");
+    Path outDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/output");
+
     // submit the job   
-    RunningJob rJob = TestJobKillAndFail.runJob(job);
+    RunningJob rJob = UtilsForTests.runJob(job, inDir, outDir);
     
     // wait for the job to be running
     while (rJob.getJobState() != JobStatus.RUNNING) {
@@ -363,7 +378,10 @@
     mr.getJobTrackerRunner().getJobTracker()
       .addJobInProgressListener(myListener);
     
-    RunningJob rJob = TestJobKillAndFail.runJob(job);
+    Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/input");
+    Path outDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/output");
+
+    RunningJob rJob = UtilsForTests.runJob(job, inDir, outDir);
     JobID id = rJob.getID();
     LOG.info("Job : " + id.toString() + " submitted");
     

Modified: 
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java?rev=747297&r1=747296&r2=747297&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java 
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java 
Tue Feb 24 08:01:02 2009
@@ -18,18 +18,12 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 
 import junit.framework.TestCase;
 
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 
 /**
  * A JUnit test to test Kill Job & Fail Job functionality with local file
@@ -40,86 +34,6 @@
   private static String TEST_ROOT_DIR = new File(System.getProperty(
       "test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
 
-  static JobID runJobFail(JobConf conf) throws IOException {
-
-    conf.setJobName("testjobfail");
-    conf.setMapperClass(FailMapper.class);
-
-    RunningJob job = runJob(conf);
-    while (!job.isComplete()) {
-      try {
-        Thread.sleep(100);
-      } catch (InterruptedException e) {
-        break;
-      }
-    }
-    // Checking that the Job got failed
-    assertEquals(job.getJobState(), JobStatus.FAILED);
-    
-    return job.getID();
-  }
-
-  static JobID runJobKill(JobConf conf) throws IOException {
-
-    conf.setJobName("testjobkill");
-    conf.setMapperClass(KillMapper.class);
-
-    RunningJob job = runJob(conf);
-    while (job.getJobState() != JobStatus.RUNNING) {
-      try {
-        Thread.sleep(100);
-      } catch (InterruptedException e) {
-        break;
-      }
-    }
-    job.killJob();
-    while (job.cleanupProgress() == 0.0f) {
-      try {
-        Thread.sleep(10);
-      } catch (InterruptedException ie) {
-        break;
-      }
-    }
-    // Checking that the Job got killed
-    assertTrue(job.isComplete());
-    assertEquals(job.getJobState(), JobStatus.KILLED);
-    
-    return job.getID();
-  }
-
-  static RunningJob runJob(JobConf conf) throws IOException {
-
-    final Path inDir = new Path(TEST_ROOT_DIR + "/failkilljob/input");
-    final Path outDir = new Path(TEST_ROOT_DIR + "/failkilljob/output");
-
-    // run the dummy sleep map
-    FileSystem fs = FileSystem.get(conf);
-    fs.delete(outDir, true);
-    if (!fs.exists(inDir)) {
-      fs.mkdirs(inDir);
-    }
-    String input = "The quick brown fox\n" + "has many silly\n"
-        + "red fox sox\n";
-    DataOutputStream file = fs.create(new Path(inDir, "part-0"));
-    file.writeBytes(input);
-    file.close();
-
-    conf.setInputFormat(TextInputFormat.class);
-    conf.setOutputKeyClass(Text.class);
-    conf.setOutputValueClass(IntWritable.class);
-
-    FileInputFormat.setInputPaths(conf, inDir);
-    FileOutputFormat.setOutputPath(conf, outDir);
-    conf.setNumMapTasks(1);
-    conf.setNumReduceTasks(0);
-
-    JobClient jobClient = new JobClient(conf);
-    RunningJob job = jobClient.submitJob(conf);
-
-    return job;
-
-  }
-
   public void testJobFailAndKill() throws IOException {
     MiniMRCluster mr = null;
     try {
@@ -127,38 +41,21 @@
 
       // run the TCs
       JobConf conf = mr.createJobConf();
-      runJobFail(conf);
-      runJobKill(conf);
+
+      Path inDir = new Path(TEST_ROOT_DIR + "/failkilljob/input");
+      Path outDir = new Path(TEST_ROOT_DIR + "/failkilljob/output");
+      RunningJob job = UtilsForTests.runJobFail(conf, inDir, outDir);
+      // Checking that the Job got failed
+      assertEquals(job.getJobState(), JobStatus.FAILED);
+
+      job = UtilsForTests.runJobKill(conf, inDir, outDir);
+      // Checking that the Job got killed
+      assertTrue(job.isComplete());
+      assertEquals(job.getJobState(), JobStatus.KILLED);
     } finally {
       if (mr != null) {
         mr.shutdown();
       }
     }
   }
-
-  static class FailMapper extends MapReduceBase implements
-      Mapper<WritableComparable, Writable, WritableComparable, Writable> {
-
-    public void map(WritableComparable key, Writable value,
-        OutputCollector<WritableComparable, Writable> out, Reporter reporter)
-        throws IOException {
-
-      throw new RuntimeException("failing map");
-    }
-  }
-
-  static class KillMapper extends MapReduceBase implements
-      Mapper<WritableComparable, Writable, WritableComparable, Writable> {
-
-    public void map(WritableComparable key, Writable value,
-        OutputCollector<WritableComparable, Writable> out, Reporter reporter)
-        throws IOException {
-
-      try {
-        Thread.sleep(100000);
-      } catch (InterruptedException e) {
-        // Do nothing
-      }
-    }
-  }
 }

Modified: 
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java?rev=747297&r1=747296&r2=747297&view=diff
==============================================================================
--- 
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java 
(original)
+++ 
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java 
Tue Feb 24 08:01:02 2009
@@ -286,6 +286,10 @@
     
     testTaskCompletionEvents(jtEvents, trackerEvents, true, 2 * numMaps);
     
+    // validate the history file
+    TestJobHistory.validateJobHistoryFileFormat(id, newConf, "SUCCESS", true);
+    TestJobHistory.validateJobHistoryFileContent(mr, job, newConf);
+    
     // check if the cluster status is insane
     ClusterStatus status = jobClient.getClusterStatus();
     assertTrue("Cluster status is insane", 

Modified: 
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java?rev=747297&r1=747296&r2=747297&view=diff
==============================================================================
--- 
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java
 (original)
+++ 
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java
 Tue Feb 24 08:01:02 2009
@@ -112,6 +112,10 @@
       mr.getJobTrackerRunner().getJobTracker().getJob(id).failedMapTasks;
     assertTrue("Tasks that were run on the lost tracker were not killed", 
                failedMaps > 0);
+
+    // validate the history file
+    TestJobHistory.validateJobHistoryFileFormat(id, job, "SUCCESS", true);
+    TestJobHistory.validateJobHistoryFileContent(mr, rJob, job);
   }
   
   public void testRestartWithLostTracker() throws IOException {

Modified: 
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLostTracker.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLostTracker.java?rev=747297&r1=747296&r2=747297&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLostTracker.java 
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLostTracker.java 
Tue Feb 24 08:01:02 2009
@@ -100,6 +100,9 @@
       testTaskStatuses(taskInProgress.getTaskStatuses());
     }
     
+    // validate the history file
+    TestJobHistory.validateJobHistoryFileFormat(id, job, "SUCCESS", true);
+    TestJobHistory.validateJobHistoryFileContent(mr, rJob, job);
   }
   
   private void testTaskStatuses(TaskStatus[] tasks) {

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/UtilsForTests.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/UtilsForTests.java?rev=747297&r1=747296&r2=747297&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/UtilsForTests.java 
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/UtilsForTests.java Tue 
Feb 24 08:01:02 2009
@@ -20,6 +20,7 @@
 
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.text.DecimalFormat;
@@ -37,9 +38,12 @@
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import 
org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 
 /** 
@@ -530,4 +534,129 @@
       return new RandomRecordReader(((FileSplit) split).getPath());
     }
   }
+
+  // Start a job and return its RunningJob object
+  static RunningJob runJob(JobConf conf, Path inDir, Path outDir)
+                    throws IOException {
+
+    FileSystem fs = FileSystem.get(conf);
+    fs.delete(outDir, true);
+    if (!fs.exists(inDir)) {
+      fs.mkdirs(inDir);
+    }
+    String input = "The quick brown fox\n" + "has many silly\n"
+        + "red fox sox\n";
+    DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+    file.writeBytes(input);
+    file.close();
+
+    conf.setInputFormat(TextInputFormat.class);
+    conf.setOutputKeyClass(LongWritable.class);
+    conf.setOutputValueClass(Text.class);
+
+    FileInputFormat.setInputPaths(conf, inDir);
+    FileOutputFormat.setOutputPath(conf, outDir);
+    conf.setNumMapTasks(1);
+    conf.setNumReduceTasks(1);
+
+    JobClient jobClient = new JobClient(conf);
+    RunningJob job = jobClient.submitJob(conf);
+
+    return job;
+  }
+
+  // Run a job that will be succeeded and wait until it completes
+  static RunningJob runJobSucceed(JobConf conf, Path inDir, Path outDir)
+         throws IOException {
+    conf.setJobName("test-job-succeed");
+    conf.setMapperClass(IdentityMapper.class);
+    conf.setReducerClass(IdentityReducer.class);
+    
+    RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
+    while (!job.isComplete()) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        break;
+      }
+    }
+
+    return job;
+  }
+
+  // Run a job that will be failed and wait until it completes
+  static RunningJob runJobFail(JobConf conf, Path inDir, Path outDir)
+         throws IOException {
+    conf.setJobName("test-job-fail");
+    conf.setMapperClass(FailMapper.class);
+    conf.setReducerClass(IdentityReducer.class);
+    
+    RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
+    while (!job.isComplete()) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        break;
+      }
+    }
+
+    return job;
+  }
+
+  // Run a job that will be killed and wait until it completes
+  static RunningJob runJobKill(JobConf conf,  Path inDir, Path outDir)
+         throws IOException {
+
+    conf.setJobName("test-job-kill");
+    conf.setMapperClass(KillMapper.class);
+    conf.setReducerClass(IdentityReducer.class);
+    
+    RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
+    while (job.getJobState() != JobStatus.RUNNING) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        break;
+      }
+    }
+    job.killJob();
+    while (job.cleanupProgress() == 0.0f) {
+      try {
+        Thread.sleep(10);
+      } catch (InterruptedException ie) {
+        break;
+      }
+    }
+
+    return job;
+  }
+
+  // Mapper that fails
+  static class FailMapper extends MapReduceBase implements
+      Mapper<WritableComparable, Writable, WritableComparable, Writable> {
+
+    public void map(WritableComparable key, Writable value,
+        OutputCollector<WritableComparable, Writable> out, Reporter reporter)
+        throws IOException {
+
+      throw new RuntimeException("failing map");
+    }
+  }
+
+  // Mapper that sleeps for a long time.
+  // Used for running a job that will be killed
+  static class KillMapper extends MapReduceBase implements
+      Mapper<WritableComparable, Writable, WritableComparable, Writable> {
+
+    public void map(WritableComparable key, Writable value,
+        OutputCollector<WritableComparable, Writable> out, Reporter reporter)
+        throws IOException {
+
+      try {
+        Thread.sleep(1000000);
+      } catch (InterruptedException e) {
+        // Do nothing
+      }
+    }
+  }
 }


Reply via email to