Author: acmurthy
Date: Sat Jan 12 18:49:01 2008
New Revision: 611527

URL: http://svn.apache.org/viewvc?rev=611527&view=rev
Log:
HADOOP-1876. Persist statuses of completed jobs in HDFS so that the JobClient 
can query and get information about decommissioned jobs and also across 
JobTracker restarts. Contributed by Alejandro Abdelnur.

Added:
    
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java
    
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java
    
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java
    
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/conf/hadoop-default.xml
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=611527&r1=611526&r2=611527&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Sat Jan 12 18:49:01 2008
@@ -217,6 +217,16 @@
     HADOOP-2464. Unit tests for chmod, chown, and chgrp using DFS.
     (Raghu Angadi)
 
+    HADOOP-1876. Persist statuses of completed jobs in HDFS so that the
+    JobClient can query and get information about decommissioned jobs and also
+    across JobTracker restarts.
+    Configuration changes to hadoop-default.xml:
+      add mapred.job.tracker.persist.jobstatus.active (default value of false)
+      add mapred.job.tracker.persist.jobstatus.hours (default value of 0)
+      add mapred.job.tracker.persist.jobstatus.dir (default value of
+                                                    /jobtracker/jobsInfo)
+    (Alejandro Abdelnur via acmurthy) 
+
   OPTIMIZATIONS
 
     HADOOP-1898.  Release the lock protecting the last time of the last stack

Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?rev=611527&r1=611526&r2=611527&view=diff
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Sat Jan 12 18:49:01 2008
@@ -893,6 +893,33 @@
   </description>
 </property>
 
+  <property>
+    <name>mapred.job.tracker.persist.jobstatus.active</name>
+    <value>false</value>
+    <description>Indicates if persistency of job status information is
+      active or not.
+    </description>
+  </property>
+
+  <property>
+  <name>mapred.job.tracker.persist.jobstatus.hours</name>
+  <value>0</value>
+  <description>The number of hours job status information is persisted in DFS.
+    The job status information will be available after it drops of the memory
+    queue and between jobtracker restarts. With a zero value the job status
+    information is not persisted at all in DFS.
+  </description>
+</property>
+
+  <property>
+    <name>mapred.job.tracker.persist.jobstatus.dir</name>
+    <value>/jobtracker/jobsInfo</value>
+    <description>The directory where the job status information is persisted
+      in a file system to be available after it drops of the memory queue and
+      between jobtracker restarts.
+    </description>
+  </property>
+
 <!-- ipc properties -->
 
 <property>

Added: 
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java?rev=611527&view=auto
==============================================================================
--- 
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java
 (added)
+++ 
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java
 Sat Jan 12 18:49:01 2008
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */  
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+
+import java.io.IOException;
+
+/**
+ * Persists and retrieves the Job info of a job into/from DFS.
+ * <p/>
+ * If the retain time is zero jobs are not persisted.
+ * <p/>
+ * A daemon thread cleans up job info files older than the retain time
+ * <p/>
+ * The retain time can be set with the 'persist.jobstatus.hours'
+ * configuration variable (it is in hours).
+ */
+public class CompletedJobStatusStore implements Runnable {
+  private boolean active;
+  private String jobInfoDir;
+  private long retainTime;
+  private FileSystem fs;
+  private static final String JOB_INFO_STORE_DIR = "/jobtracker/jobsInfo";
+
+  public static final Log LOG =
+          LogFactory.getLog(CompletedJobStatusStore.class);
+
+  private static long HOUR = 1000 * 60 * 60;
+  private static long SLEEP_TIME = 1 * HOUR;
+
+  CompletedJobStatusStore(Configuration conf) throws IOException {
+    active =
+      conf.getBoolean("mapred.job.tracker.persist.jobstatus.active", false);
+
+    if (active) {
+      fs = FileSystem.get(conf);
+      retainTime =
+        conf.getInt("mapred.job.tracker.persist.jobstatus.hours", 0) * HOUR;
+
+      jobInfoDir =
+        conf.get("mapred.job.tracker.persist.jobstatus.dir", 
JOB_INFO_STORE_DIR);
+
+      Path path = new Path(jobInfoDir);
+      if (!fs.exists(path)) {
+        fs.mkdirs(path);
+      }
+
+      if (retainTime == 0) {
+        // as retain time is zero, all stored jobstatuses are deleted.
+        deleteJobStatusDirs();
+      }
+    }
+  }
+
+  /**
+   * Indicates if job status persistency is active or not.
+   *
+   * @return TRUE if active, FALSE otherwise.
+   */
+  public boolean isActive() {
+    return active;
+  }
+
+  public void run() {
+    if (retainTime > 0) {
+      while (true) {
+        deleteJobStatusDirs();
+        try {
+          Thread.sleep(SLEEP_TIME);
+        }
+        catch (InterruptedException ex) {
+          break;
+        }
+      }
+    }
+  }
+
+  private void deleteJobStatusDirs() {
+    try {
+      long currentTime = System.currentTimeMillis();
+      Path[] jobInfoFiles = fs.listPaths(
+              new Path[]{new Path(jobInfoDir)});
+
+      //noinspection ForLoopReplaceableByForEach
+      for (Path jobInfo : jobInfoFiles) {
+        try {
+          FileStatus status = fs.getFileStatus(jobInfo);
+          if ((currentTime - status.getModificationTime()) > retainTime) {
+            fs.delete(jobInfo);
+          }
+        }
+        catch (IOException ie) {
+          LOG.warn("Could not do housekeeping for [ " +
+                  jobInfo + "] job info : " + ie.getMessage(), ie);
+        }
+      }
+    }
+    catch (IOException ie) {
+      LOG.warn("Could not obtain job info files : " + ie.getMessage(), ie);
+    }
+  }
+
+  private Path getInfoFilePath(String jobId) {
+    return new Path(jobInfoDir, jobId + ".info");
+  }
+  
+  /**
+   * Persists a job in DFS.
+   *
+   * @param job the job about to be 'retired'
+   */
+  public void store(JobInProgress job) {
+    if (active && retainTime > 0) {
+      String jobId = job.getStatus().getJobId();
+      Path jobStatusFile = getInfoFilePath(jobId);
+      try {
+        FSDataOutputStream dataOut = fs.create(jobStatusFile);
+
+        job.getStatus().write(dataOut);
+
+        job.getProfile().write(dataOut);
+
+        job.getCounters().write(dataOut);
+
+        TaskCompletionEvent[] events = 
+                job.getTaskCompletionEvents(0, Integer.MAX_VALUE);
+        dataOut.writeInt(events.length);
+        for (TaskCompletionEvent event : events) {
+          event.write(dataOut);
+        }
+
+        dataOut.close();
+      } catch (IOException ex) {
+        LOG.warn("Could not store [" + jobId + "] job info : " +
+                 ex.getMessage(), ex);
+        try {
+          fs.delete(jobStatusFile);
+        }
+        catch (IOException ex1) {
+          //ignore
+        }
+      }
+    }
+  }
+
+  private FSDataInputStream getJobInfoFile(String jobId) throws IOException {
+    Path jobStatusFile = getInfoFilePath(jobId);
+    return (fs.exists(jobStatusFile)) ? fs.open(jobStatusFile) : null;
+  }
+
+  private JobStatus readJobStatus(FSDataInputStream dataIn) throws IOException 
{
+    JobStatus jobStatus = new JobStatus();
+    jobStatus.readFields(dataIn);
+    return jobStatus;
+  }
+
+  private JobProfile readJobProfile(FSDataInputStream dataIn)
+          throws IOException {
+    JobProfile jobProfile = new JobProfile();
+    jobProfile.readFields(dataIn);
+    return jobProfile;
+  }
+
+  private Counters readCounters(FSDataInputStream dataIn) throws IOException {
+    Counters counters = new Counters();
+    counters.readFields(dataIn);
+    return counters;
+  }
+
+  private TaskCompletionEvent[] readEvents(FSDataInputStream dataIn,
+                                           int offset, int len)
+          throws IOException {
+    int size = dataIn.readInt();
+    if (offset > size) {
+      return TaskCompletionEvent.EMPTY_ARRAY;
+    }
+    if (offset + len > size) {
+      len = size - offset;
+    }
+    TaskCompletionEvent[] events = new TaskCompletionEvent[len];
+    for (int i = 0; i < (offset + len); i++) {
+      TaskCompletionEvent event = new TaskCompletionEvent();
+      event.readFields(dataIn);
+      if (i >= offset) {
+        events[i - offset] = event;
+      }
+    }
+    return events;
+  }
+
+  /**
+   * This method retrieves JobStatus information from DFS stored using
+   * store method.
+   *
+   * @param jobId the jobId for which jobStatus is queried
+   * @return JobStatus object, null if not able to retrieve
+   */
+  public JobStatus readJobStatus(String jobId) {
+    JobStatus jobStatus = null;
+    if (active) {
+      try {
+        FSDataInputStream dataIn = getJobInfoFile(jobId);
+        if (dataIn != null) {
+          jobStatus = readJobStatus(dataIn);
+          dataIn.close();
+        }
+      } catch (IOException ex) {
+        LOG.warn("Could not read [" + jobId + "] job status : " + ex, ex);
+      }
+    }
+    return jobStatus;
+  }
+
+  /**
+   * This method retrieves JobProfile information from DFS stored using
+   * store method.
+   *
+   * @param jobId the jobId for which jobProfile is queried
+   * @return JobProfile object, null if not able to retrieve
+   */
+  public JobProfile readJobProfile(String jobId) {
+    JobProfile jobProfile = null;
+    if (active) {
+      try {
+        FSDataInputStream dataIn = getJobInfoFile(jobId);
+        if (dataIn != null) {
+          readJobStatus(dataIn);
+          jobProfile = readJobProfile(dataIn);
+          dataIn.close();
+        }
+      } catch (IOException ex) {
+        LOG.warn("Could not read [" + jobId + "] job profile : " + ex, ex);
+      }
+    }
+    return jobProfile;
+  }
+
+  /**
+   * This method retrieves Counters information from DFS stored using
+   * store method.
+   *
+   * @param jobId the jobId for which Counters is queried
+   * @return Counters object, null if not able to retrieve
+   */
+  public Counters readCounters(String jobId) {
+    Counters counters = null;
+    if (active) {
+      try {
+        FSDataInputStream dataIn = getJobInfoFile(jobId);
+        if (dataIn != null) {
+          readJobStatus(dataIn);
+          readJobProfile(dataIn);
+          counters = readCounters(dataIn);
+          dataIn.close();
+        }
+      } catch (IOException ex) {
+        LOG.warn("Could not read [" + jobId + "] job counters : " + ex, ex);
+      }
+    }
+    return counters;
+  }
+
+  /**
+   * This method retrieves TaskCompletionEvents information from DFS stored
+   * using store method.
+   *
+   * @param jobId       the jobId for which TaskCompletionEvents is queried
+   * @param fromEventId events offset
+   * @param maxEvents   max number of events
+   * @return TaskCompletionEvent[], empty array if not able to retrieve
+   */
+  public TaskCompletionEvent[] readJobTaskCompletionEvents(String jobId,
+                                                               int fromEventId,
+                                                               int maxEvents) {
+    TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;
+    if (active) {
+      try {
+        FSDataInputStream dataIn = getJobInfoFile(jobId);
+        if (dataIn != null) {
+          readJobStatus(dataIn);
+          readJobProfile(dataIn);
+          readCounters(dataIn);
+          events = readEvents(dataIn, fromEventId, maxEvents);
+          dataIn.close();
+        }
+      } catch (IOException ex) {
+        LOG.warn("Could not read [" + jobId + "] job events : " + ex, ex);
+      }
+    }
+    return events;
+  }
+
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=611527&r1=611526&r2=611527&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Sat 
Jan 12 18:49:01 2008
@@ -563,7 +563,10 @@
   ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks();
   Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks,
                                                 "expireLaunchingTasks");
-    
+
+  CompletedJobStatusStore completedJobStatusStore = null;
+  Thread completedJobsStoreThread = null;
+
   /**
    * It might seem like a bug to maintain a TreeSet of status objects,
    * which can be updated at any time.  But that's not what happens!  We
@@ -701,6 +704,10 @@
     synchronized (this) {
       state = State.RUNNING;
     }
+
+    //initializes the job status store
+    completedJobStatusStore = new CompletedJobStatusStore(conf);
+
     LOG.info("Starting RUNNING");
   }
 
@@ -726,6 +733,12 @@
     this.taskCommitThread = new TaskCommitQueue();
     this.taskCommitThread.start();
 
+    if (completedJobStatusStore.isActive()) {
+      completedJobsStoreThread = new Thread(completedJobStatusStore,
+                                            "completedjobsStore-housekeeper");
+      completedJobsStoreThread.start();
+    }
+
     this.interTrackerServer.join();
     LOG.info("Stopped interTrackerServer");
   }
@@ -788,6 +801,16 @@
         ex.printStackTrace();
       }
     }
+    if (this.completedJobsStoreThread != null &&
+        this.completedJobsStoreThread.isAlive()) {
+      LOG.info("Stopping completedJobsStore thread");
+      this.completedJobsStoreThread.interrupt();
+      try {
+        this.completedJobsStoreThread.join();
+      } catch (InterruptedException ex) {
+        ex.printStackTrace();
+      }
+    }
     LOG.info("stopped all jobtracker services");
     return;
   }
@@ -895,7 +918,7 @@
         LOG.info("Removed completed task '" + taskid + "' from '" + 
                  taskTracker + "'");
       }
-      // Clear 
+      // Clear
       trackerToMarkedTasksMap.remove(taskTracker);
     }
   }
@@ -937,6 +960,9 @@
     // Mark the 'non-running' tasks for pruning
     markCompletedJob(job);
 
+    //persists the job info in DFS
+    completedJobStatusStore.store(job);
+    
     JobEndNotifier.registerNotification(job.getJobConf(), job.getStatus());
 
     // Purge oldest jobs and keep at-most MAX_COMPLETE_USER_JOBS_IN_MEMORY 
jobs of a given user
@@ -1611,7 +1637,7 @@
     if (job != null) {
       return job.getProfile();
     } else {
-      return null;
+      return completedJobStatusStore.readJobProfile(jobid);
     }
   }
   public synchronized JobStatus getJobStatus(String jobid) {
@@ -1619,7 +1645,7 @@
     if (job != null) {
       return job.getStatus();
     } else {
-      return null;
+      return completedJobStatusStore.readJobStatus(jobid);
     }
   }
   public synchronized Counters getJobCounters(String jobid) {
@@ -1627,7 +1653,7 @@
     if (job != null) {
       return job.getCounters();
     } else {
-      return null;
+      return completedJobStatusStore.readCounters(jobid);
     }
   }
   public synchronized TaskReport[] getMapTaskReports(String jobid) {
@@ -1679,10 +1705,13 @@
    */
   public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
                                                                     String 
jobid, int fromEventId, int maxEvents) throws IOException{
-    TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;
+    TaskCompletionEvent[] events;
     JobInProgress job = this.jobs.get(jobid);
     if (null != job) {
       events = job.getTaskCompletionEvents(fromEventId, maxEvents);
+    }
+    else {
+      events = completedJobStatusStore.readJobTaskCompletionEvents(jobid, 
fromEventId, maxEvents);
     }
     return events;
   }

Added: 
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java?rev=611527&view=auto
==============================================================================
--- 
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java
 (added)
+++ 
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java
 Sat Jan 12 18:49:01 2008
@@ -0,0 +1,178 @@
+package org.apache.hadoop.mapred;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Test case to run a MapReduce job.
+ * <p/>
+ * It runs a 2 node cluster Hadoop with a 2 node DFS.
+ * <p/>
+ * The JobConf to use must be obtained via the creatJobConf() method.
+ * <p/>
+ * It creates a temporary directory -accessible via getTestRootDir()-
+ * for both input and output.
+ * <p/>
+ * The input directory is accesible via getInputDir() and the output
+ * directory via getOutputDir()
+ * <p/>
+ * The DFS filesystem is formated before the testcase starts and after it ends.
+ */
+public abstract class ClusterMapReduceTestCase extends TestCase {
+  private MiniDFSCluster dfsCluster = null;
+  private MiniMRCluster mrCluster = null;
+  private FileSystem fileSystem = null;
+
+  /**
+   * Creates Hadoop Cluster and DFS before a test case is run.
+   *
+   * @throws Exception
+   */
+  protected void setUp() throws Exception {
+    super.setUp();
+
+    startCluster(true, null);
+  }
+
+  /**
+   * Starts the cluster within a testcase.
+   * <p/>
+   * Note that the cluster is already started when the testcase method
+   * is invoked. This method is useful if as part of the testcase the
+   * cluster has to be shutdown and restarted again.
+   * <p/>
+   * If the cluster is already running this method does nothing.
+   *
+   * @param reformatDFS indicates if DFS has to be reformated
+   * @param props configuration properties to inject to the mini cluster
+   * @throws Exception if the cluster could not be started
+   */
+  protected synchronized void startCluster(boolean reformatDFS, Properties 
props)
+          throws Exception {
+    if (dfsCluster == null) {
+      JobConf conf = new JobConf();
+      if (props != null) {
+        for (Map.Entry entry : props.entrySet()) {
+          conf.set((String) entry.getKey(), (String) entry.getValue());
+        }
+      }
+      dfsCluster = new MiniDFSCluster(conf, 2, reformatDFS, null);
+      fileSystem = dfsCluster.getFileSystem();
+
+      ConfigurableMiniMRCluster.setConfiguration(props);
+      //noinspection deprecation
+      mrCluster = new ConfigurableMiniMRCluster(2, fileSystem.getName(), 1);
+    }
+  }
+
+  private static class ConfigurableMiniMRCluster extends MiniMRCluster {
+    private static Properties config;
+
+    public static void setConfiguration(Properties props) {
+      config = props;
+    }
+
+    public ConfigurableMiniMRCluster(int numTaskTrackers, String namenode,
+                                     int numDir) throws Exception {
+      super(numTaskTrackers, namenode, numDir);
+    }
+
+    public JobConf createJobConf() {
+      JobConf conf = super.createJobConf();
+      if (config != null) {
+        for (Map.Entry entry : config.entrySet()) {
+          conf.set((String) entry.getKey(), (String) entry.getValue());
+        }
+      }
+      return conf;
+    }
+  }
+
+  /**
+   * Stops the cluster within a testcase.
+   * <p/>
+   * Note that the cluster is already started when the testcase method
+   * is invoked. This method is useful if as part of the testcase the
+   * cluster has to be shutdown.
+   * <p/>
+   * If the cluster is already stopped this method does nothing.
+   *
+   * @throws Exception if the cluster could not be stopped
+   */
+  protected void stopCluster() throws Exception {
+    if (mrCluster != null) {
+      mrCluster.shutdown();
+      mrCluster = null;
+    }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+      dfsCluster = null;
+      fileSystem = null;
+    }
+  }
+
+  /**
+   * Destroys Hadoop Cluster and DFS after a test case is run.
+   *
+   * @throws Exception
+   */
+  protected void tearDown() throws Exception {
+    stopCluster();
+    super.tearDown();
+  }
+
+  /**
+   * Returns a preconfigured Filesystem instance for test cases to read and
+   * write files to it.
+   * <p/>
+   * TestCases should use this Filesystem instance.
+   *
+   * @return the filesystem used by Hadoop.
+   */
+  protected FileSystem getFileSystem() {
+    return fileSystem;
+  }
+
+  /**
+   * Returns the path to the root directory for the testcase.
+   *
+   * @return path to the root directory for the testcase.
+   */
+  protected Path getTestRootDir() {
+    return new Path("x").getParent();
+  }
+
+  /**
+   * Returns a path to the input directory for the testcase.
+   *
+   * @return path to the input directory for the tescase.
+   */
+  protected Path getInputDir() {
+    return new Path("input");
+  }
+
+  /**
+   * Returns a path to the output directory for the testcase.
+   *
+   * @return path to the output directory for the tescase.
+   */
+  protected Path getOutputDir() {
+    return new Path("output");
+  }
+
+  /**
+   * Returns a job configuration preconfigured to run against the Hadoop
+   * managed by the testcase.
+   *
+   * @return configuration that works on the testcase Hadoop instance
+   */
+  protected JobConf createJobConf() {
+    return mrCluster.createJobConf();
+  }
+
+}

Added: 
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java?rev=611527&view=auto
==============================================================================
--- 
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java
 (added)
+++ 
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java
 Sat Jan 12 18:49:01 2008
@@ -0,0 +1,139 @@
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.*;
+import java.util.Iterator;
+import java.util.Properties;
+
+public class TestClusterMapReduceTestCase extends ClusterMapReduceTestCase {
+
+  public static class EchoMap implements Mapper {
+
+    public void configure(JobConf conf) {
+    }
+
+    public void close() {
+    }
+
+    public void map(WritableComparable key, Writable value,
+                    OutputCollector collector, Reporter reporter) throws 
IOException {
+      collector.collect(key, value);
+    }
+  }
+
+  public static class EchoReduce implements Reducer {
+
+    public void configure(JobConf conf) {
+    }
+
+    public void close() {
+    }
+
+    public void reduce(WritableComparable key, Iterator values,
+                       OutputCollector collector, Reporter reporter) throws 
IOException {
+      while (values.hasNext()) {
+        Writable value = (Writable) values.next();
+        collector.collect(key, value);
+      }
+    }
+
+  }
+
+  public void _testMapReduce(boolean restart) throws Exception {
+    OutputStream os = getFileSystem().create(new Path(getInputDir(), 
"text.txt"));
+    Writer wr = new OutputStreamWriter(os);
+    wr.write("hello1\n");
+    wr.write("hello2\n");
+    wr.write("hello3\n");
+    wr.write("hello4\n");
+    wr.close();
+
+    if (restart) {
+      stopCluster();
+      startCluster(false, null);
+    }
+    
+    JobConf conf = createJobConf();
+    conf.setJobName("mr");
+
+    conf.setInputFormat(TextInputFormat.class);
+
+    conf.setMapOutputKeyClass(LongWritable.class);
+    conf.setMapOutputValueClass(Text.class);
+
+    conf.setOutputFormat(TextOutputFormat.class);
+    conf.setOutputKeyClass(LongWritable.class);
+    conf.setOutputValueClass(Text.class);
+
+    conf.setMapperClass(TestClusterMapReduceTestCase.EchoMap.class);
+    conf.setReducerClass(TestClusterMapReduceTestCase.EchoReduce.class);
+
+    conf.setInputPath(getInputDir());
+
+    conf.setOutputPath(getOutputDir());
+
+
+    JobClient.runJob(conf);
+
+    Path[] outputFiles = getFileSystem().listPaths(getOutputDir());
+
+    if (outputFiles.length > 0) {
+      InputStream is = getFileSystem().open(outputFiles[0]);
+      BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+      String line = reader.readLine();
+      int counter = 0;
+      while (line != null) {
+        counter++;
+        assertTrue(line.contains("hello"));
+        line = reader.readLine();
+      }
+      reader.close();
+      assertEquals(4, counter);
+    }
+
+  }
+
+  public void testMapReduce() throws Exception {
+    _testMapReduce(false);
+  }
+
+  public void testMapReduceRestarting() throws Exception {
+    _testMapReduce(true);
+  }
+
+  public void testDFSRestart() throws Exception {
+    Path file = new Path(getInputDir(), "text.txt");
+    OutputStream os = getFileSystem().create(file);
+    Writer wr = new OutputStreamWriter(os);
+    wr.close();
+
+    stopCluster();
+    startCluster(false, null);
+    assertTrue(getFileSystem().exists(file));
+
+    stopCluster();
+    startCluster(true, null);
+    assertFalse(getFileSystem().exists(file));
+    
+  }
+
+  public void testMRConfig() throws Exception {
+    JobConf conf = createJobConf();
+    assertNull(conf.get("xyz"));
+
+    Properties config = new Properties();
+    config.setProperty("xyz", "XYZ");
+    stopCluster();
+    startCluster(false, config);
+
+    conf = createJobConf();
+    assertEquals("XYZ", conf.get("xyz"));
+  }
+
+}

Added: 
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java?rev=611527&view=auto
==============================================================================
--- 
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java
 (added)
+++ 
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java
 Sat Jan 12 18:49:01 2008
@@ -0,0 +1,124 @@
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.fs.Path;
+
+import java.io.*;
+import java.util.Iterator;
+import java.util.Properties;
+
+public class TestJobStatusPersistency extends ClusterMapReduceTestCase {
+
+  public static class EchoMap implements Mapper {
+
+    public void configure(JobConf conf) {
+    }
+
+    public void close() {
+    }
+
+    public void map(WritableComparable key, Writable value,
+                    OutputCollector collector, Reporter reporter) throws 
IOException {
+      collector.collect(key, value);
+    }
+  }
+
+  public static class EchoReduce implements Reducer {
+
+    public void configure(JobConf conf) {
+    }
+
+    public void close() {
+    }
+
+    public void reduce(WritableComparable key, Iterator values,
+                       OutputCollector collector, Reporter reporter) throws 
IOException {
+      while (values.hasNext()) {
+        Writable value = (Writable) values.next();
+        collector.collect(key, value);
+      }
+    }
+
+  }
+
+  private String runJob() throws Exception {
+    OutputStream os = getFileSystem().create(new Path(getInputDir(), 
"text.txt"));
+    Writer wr = new OutputStreamWriter(os);
+    wr.write("hello1\n");
+    wr.write("hello2\n");
+    wr.write("hello3\n");
+    wr.write("hello4\n");
+    wr.close();
+
+    JobConf conf = createJobConf();
+    conf.setJobName("mr");
+
+    conf.setInputFormat(TextInputFormat.class);
+
+    conf.setMapOutputKeyClass(LongWritable.class);
+    conf.setMapOutputValueClass(Text.class);
+
+    conf.setOutputFormat(TextOutputFormat.class);
+    conf.setOutputKeyClass(LongWritable.class);
+    conf.setOutputValueClass(Text.class);
+
+    conf.setMapperClass(TestJobStatusPersistency.EchoMap.class);
+    conf.setReducerClass(TestJobStatusPersistency.EchoReduce.class);
+
+    conf.setInputPath(getInputDir());
+
+    conf.setOutputPath(getOutputDir());
+
+    return JobClient.runJob(conf).getJobID();
+  }
+
+  public void testNonPersistency() throws Exception {
+    String jobId = runJob();
+    JobClient jc = new JobClient(createJobConf());
+    RunningJob rj = jc.getJob(jobId);
+    assertNotNull(rj);
+    stopCluster();
+    startCluster(false, null);
+    jc = new JobClient(createJobConf());
+    rj = jc.getJob(jobId);
+    assertNull(rj);
+  }
+
+  public void testPersistency() throws Exception {
+    Properties config = new Properties();
+    config.setProperty("mapred.job.tracker.persist.jobstatus.active", "true");
+    config.setProperty("mapred.job.tracker.persist.jobstatus.hours", "1");
+    stopCluster();
+    startCluster(false, config);
+    String jobId = runJob();
+    JobClient jc = new JobClient(createJobConf());
+    RunningJob rj0 = jc.getJob(jobId);
+    assertNotNull(rj0);
+    boolean sucessfull0 = rj0.isSuccessful();
+    String jobName0 = rj0.getJobName();
+    Counters counters0 = rj0.getCounters();
+    TaskCompletionEvent[] events0 = rj0.getTaskCompletionEvents(0);
+
+    stopCluster();
+    startCluster(false, config);
+     
+    jc = new JobClient(createJobConf());
+    RunningJob rj1 = jc.getJob(jobId);
+    assertNotNull(rj1);
+    assertEquals(sucessfull0, rj1.isSuccessful());
+    assertEquals(jobName0, rj0.getJobName());
+    assertEquals(counters0.size(), rj1.getCounters().size());
+
+    TaskCompletionEvent[] events1 = rj1.getTaskCompletionEvents(0);
+    assertEquals(events0.length, events1.length);    
+    for (int i = 0; i < events0.length; i++) {
+      assertEquals(events0[i].getTaskId(), events1[i].getTaskId());
+      assertEquals(events0[i].getTaskStatus(), events1[i].getTaskStatus());
+    }
+  }
+
+}


Reply via email to