Author: cutting
Date: Tue Aug 15 14:55:52 2006
New Revision: 431715

URL: http://svn.apache.org/viewvc?rev=431715&view=rev
Log:
HADOOP-322.  Add a job control utility.  Contributed by Runping.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java
    
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java
    
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/package.html
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/
    
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=431715&r1=431714&r2=431715&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Aug 15 14:55:52 2006
@@ -46,6 +46,11 @@
     critical percentage of the datanodes are unavailable.
     (Konstantin Shvachko via cutting)
 
+11. HADOOP-322.  Add a job control utility.  This permits one to
+    specify job interdependencies.  Each job is submitted only after
+    the jobs it depends on have successfully completed.
+    (Runping Qi via cutting)
+
 
 Release 0.5.0 - 2006-08-04
 

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java?rev=431715&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java 
(added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java 
Tue Aug 15 14:55:52 2006
@@ -0,0 +1,331 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.jobcontrol;
+
+
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.util.StringUtils;
+
+import java.util.ArrayList;
+import java.io.IOException;
+
+/** This class encapsulates a MapReduce job and its dependency. It monitors 
+ *  the states of the depending jobs and updates the state of this job.
+ *  A job stats in the WAITING state. If it does not have any deoending jobs, 
or
+ *  all of the depending jobs are in SUCCESS state, then the job state will 
become
+ *  READY. If any depending jobs fail, the job will fail too. 
+ *  When in READY state, the job can be submitted to Hadoop for execution, with
+ *  the state changing into RUNNING state. From RUNNING state, the job can get 
into 
+ *  SUCCESS or FAILED state, depending the status of the jon execution.
+ *  
+ */
+
+public class Job {
+
+       // A job will be in one of the following states
+       final public static int SUCCESS = 0;
+       final public static int WAITING = 1;
+       final public static int RUNNING = 2;
+       final public static int READY = 3;
+       final public static int FAILED = 4;
+       final public static int DEPENDENT_FAILED = 5;
+       
+       
+       private JobConf theJobConf;
+       private int state;
+       private String jobID;           // assigned and used by JobControl class
+       private String mapredJobID; // the job ID assigned by map/reduce
+       private String jobName;         // external name, assigned/used by 
client app
+       private String message;         // some info for human consumption, 
+                                                               // e.g. the 
reason why the job failed
+       private ArrayList dependingJobs;        // the jobs the current job 
depends on
+       
+       private JobClient jc = null;            // the map reduce job client
+       
+    /** 
+     * Construct a job.
+     * @param jobConf a mapred job configuration representing a job to be 
executed.
+     * @param dependingJobs an array of jobs the current job depends on
+     */
+    public Job(JobConf jobConf, ArrayList dependingJobs) throws IOException {
+               this.theJobConf = jobConf;
+               this.dependingJobs = dependingJobs;
+               this.state = Job.WAITING;
+               this.jobID = "unassigned";
+               this.mapredJobID = "unassigned";
+               this.jobName = "unassigned";
+               this.message = "just initialized";
+               this.jc = new JobClient(jobConf);
+       }
+       
+       public String toString() {
+               StringBuffer sb = new StringBuffer();
+               sb.append("job name:\t").append(this.jobName).append("\n");
+               sb.append("job id:\t").append(this.jobID).append("\n");
+               sb.append("job state:\t").append(this.state).append("\n");
+               sb.append("job mapred 
id:\t").append(this.mapredJobID).append("\n");
+               sb.append("job message:\t").append(this.message).append("\n");
+               
+               if (this.dependingJobs == null) {
+                       sb.append("job has no depending job:\t").append("\n");
+               } else {
+                       sb.append("job has 
").append(this.dependingJobs.size()).append(" dependeng jobs:\n");
+                       for (int i = 0; i < this.dependingJobs.size(); i++) {
+                               sb.append("\t depending job 
").append(i).append(":\t");
+                               sb.append(((Job) 
this.dependingJobs.get(i)).getJobName()).append("\n");
+                       }
+               }
+               return sb.toString();
+       }
+       
+       /**
+        * @return the job name of this job
+        */
+       public String getJobName() {
+               return this.jobName;
+       }
+       
+       /**
+        * Set the job name for  this job.
+        * @param jobName the job name
+        */
+       public void setJobName(String jobName) {
+               this.jobName = jobName;
+       }
+       
+       /**
+        * @return the job ID of this job
+        */
+       public String getJobID() {
+               return this.jobID;
+       }
+       
+       /**
+        * Set the job ID for  this job.
+        * @param id the job ID
+        */
+       public void setJobID(String id) {
+               this.jobID = id;
+       }
+       
+       /**
+        * @return the mapred ID of this job
+        */
+       public String getMapredJobID() {
+               return this.mapredJobID;
+       }
+       
+       /**
+        * Set the mapred ID for this job.
+        * @param mapredJobID the mapred job ID for this job.
+        */
+       public void setMapredJobID(String mapredJobID) {
+               this.jobID = mapredJobID;
+       }
+       
+       /**
+        * @return the mapred job conf of this job
+        */
+       public JobConf getJobConf() {
+               return this.theJobConf;
+       }
+       
+
+       /**
+        * Set the mapred job conf for this job.
+        * @param jobConf the mapred job conf for this job.
+        */
+       public void setJobConf(JobConf jobConf) {
+               this.theJobConf = jobConf;
+       }
+       
+       /**
+        * @return the state of this job
+        */
+       public int getState() {
+               return this.state;
+       }
+       
+       /**
+        * Set the state for this job.
+        * @param state the new state for this job.
+        */
+       public void setState(int state) {
+               this.state = state;
+       }
+       
+       /**
+        * @return the message of this job
+        */
+       public String getMessage() {
+               return this.message;
+       }
+       
+       /**
+        * Set the message for this job.
+        * @param message the message for this job.
+        */
+       public void setMessage(String message) {
+               this.message = message;
+       }
+       
+       /**
+        * @return the depending jobs of this job
+        */
+       public ArrayList getDependingJobs() {
+               return this.dependingJobs;
+       }
+       
+       /**
+        * @return true if this job is in a complete state
+        */
+       public boolean isCompleted() {
+               return this.state == Job.FAILED || 
+                      this.state == Job.DEPENDENT_FAILED ||
+                      this.state == Job.SUCCESS;
+       }
+       
+       /**
+        * @return true if this job is in READY state
+        */
+       public boolean isReady() {
+               return this.state == Job.READY;
+       }
+       
+       /**
+        * Check the state of this running job. The state may 
+        * remain the same, become SUCCESS or FAILED.
+        */
+       private void checkRunningState() {
+               RunningJob running = null;
+               try {
+                       running = jc.getJob(this.mapredJobID);
+                       if (running.isComplete()) {
+                               if (running.isSuccessful()) {
+                                       this.state = Job.SUCCESS;
+                               } else {
+                                       this.state = Job.FAILED;
+                                       this.message = "Job failed!";
+                                       try {
+                                               running.killJob();
+                                       } catch (IOException e1) {
+
+                                       }
+                                       try {
+                                               this.jc.close();
+                                       } catch (IOException e2) {
+
+                                       }
+                               }
+                       }
+
+               } catch (IOException ioe) {
+                       this.state = Job.FAILED;
+                       this.message = StringUtils.stringifyException(ioe);
+                       try {
+                               running.killJob();
+                       } catch (IOException e1) {
+
+                       }
+                       try {
+                               this.jc.close();
+                       } catch (IOException e1) {
+
+                       }
+               }
+       }
+       
+       /**
+        * Check and update the state of this job. The state changes  
+        * depending on its current state and the states of the depending jobs.
+        */
+       public int checkState() {
+               if (this.state == Job.RUNNING) {
+                       checkRunningState();
+               }
+               if (this.state != Job.WAITING) {
+                       return this.state;
+               }
+               if (this.dependingJobs == null || this.dependingJobs.size() == 
0) {
+                       this.state = Job.READY;
+                       return this.state;
+               }
+               Job pred = null;
+               int n = this.dependingJobs.size();
+               for (int i = 0; i < n; i++) {
+                       pred = (Job) this.dependingJobs.get(i);
+                       int s = pred.checkState();
+                       if (s == Job.WAITING || s == Job.READY || s == 
Job.RUNNING) {
+                               break; // a pred is still not completed, 
continue in WAITING
+                                               // state
+                       }
+                       if (s == Job.FAILED || s == Job.DEPENDENT_FAILED) {
+                               this.state = Job.DEPENDENT_FAILED;
+                               this.message = "depending job " + i + " with 
jobID "
+                                               + pred.getJobID() + " failed. " 
+ pred.getMessage();
+                               break;
+                       }
+                       // pred must be in success state
+                       if (i == n - 1) {
+                               this.state = Job.READY;
+                       }
+               }
+
+               return this.state;
+       }
+       
+    /**
+     * Submit this job to mapred. The state becomes RUNNING if submission 
+     * is successful, FAILED otherwise.  
+     */
+    public void submit() {
+        try {
+            if (theJobConf.getBoolean("create.empty.dir.if.nonexist", false)) {
+                FileSystem fs = FileSystem.get(theJobConf);
+                Path inputPaths[] = theJobConf.getInputPaths();
+                for (int i = 0; i < inputPaths.length; i++) {
+                    if (!fs.exists(inputPaths[i])) {
+                        try {
+                            fs.mkdirs(inputPaths[i]);
+                        } catch (IOException e) {
+
+                        }
+                    }
+                }
+            }
+            RunningJob running = jc.submitJob(theJobConf);
+            this.mapredJobID = running.getJobID();
+            this.state = Job.RUNNING;
+        } catch (IOException ioe) {
+            this.state = Job.FAILED;
+            this.message = StringUtils.stringifyException(ioe);
+        }
+    }
+       
+       /**
+        * @param args
+        */
+       public static void main(String[] args) {
+               // TODO Auto-generated method stub
+
+       }
+
+}

Added: 
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java?rev=431715&view=auto
==============================================================================
--- 
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java
 (added)
+++ 
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java
 Tue Aug 15 14:55:52 2006
@@ -0,0 +1,299 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.jobcontrol;
+
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.Iterator;
+
+/** This class encapsulates a set of MapReduce jobs and its dependency. It 
tracks 
+ *  the states of the jobs by placing them into different tables according to 
their 
+ *  states. 
+ *  
+ *  This class provides APIs for the client app to add a job to the group and 
to get 
+ *  the jobs in the group in different states. When a 
+ *  job is added, an ID unique to the group is assigned to the job. 
+ *  
+ *  This class has a thread that submits jobs when they become ready, monitors 
the
+ *  states of the running jobs, and updates the states of jobs based on the 
state changes 
+ *  of their depending jobs states. The class provides APIs for 
suspending/resuming
+ *  the thread,and for stopping the thread.
+ *  
+ */
+public class JobControl implements Runnable{
+
+       // The thread can be in one of the following state
+       private static final int RUNNING = 0;
+       private static final int SUSPENDED = 1;
+       private static final int STOPPED = 2;
+       private static final int STOPPING = 3;
+       private static final int READY = 4;
+       
+       private int runnerState;                        // the thread state
+       
+       private Hashtable waitingJobs;
+       private Hashtable readyJobs;
+       private Hashtable runningJobs;
+       private Hashtable successfulJobs;
+       private Hashtable failedJobs;
+       
+       private long nextJobID;
+       private String groupName;
+       
+       /** 
+        * Construct a job control for a group of jobs.
+        * @param groupName a name identifying this group
+        */
+       public JobControl(String groupName) {
+               this.waitingJobs = new Hashtable();
+               this.readyJobs = new Hashtable();
+               this.runningJobs = new Hashtable();
+               this.successfulJobs = new Hashtable();
+               this.failedJobs = new Hashtable();
+               this.nextJobID = -1;
+               this.groupName = groupName;
+               this.runnerState = JobControl.READY;
+               
+       }
+       
+       private static ArrayList toArrayList(Hashtable jobs) {
+               ArrayList retv = new ArrayList();
+               Iterator iter = jobs.values().iterator();
+               while (iter.hasNext()) {
+                       retv.add(iter.next());
+               }
+               return retv;
+       }
+       
+       /**
+        * @return the jobs in the waiting state
+        */
+       public ArrayList getWaitingJobs() {
+               return JobControl.toArrayList(this.waitingJobs);
+       }
+       
+       /**
+        * @return the jobs in the running state
+        */
+       public ArrayList getRunningJobs() {
+               return JobControl.toArrayList(this.runningJobs);
+       }
+       
+       /**
+        * @return the jobs in the ready state
+        */
+       public ArrayList getReadyJobs() {
+               return JobControl.toArrayList(this.readyJobs);
+       }
+       
+       /**
+        * @return the jobs in the success state
+        */
+       public ArrayList getSuccessfulJobs() {
+               return JobControl.toArrayList(this.successfulJobs);
+       }
+       
+       public ArrayList getFailedJobs() {
+               return JobControl.toArrayList(this.failedJobs);
+       }
+       
+       private String getNextJobID() {
+               nextJobID += 1;
+               return this.groupName + this.nextJobID;
+       }
+       
+       private static void addToQueue(Job aJob, Hashtable queue) {
+               synchronized(queue) {
+                       queue.put(aJob.getJobID(), aJob);
+               }               
+       }
+       
+       private void addToQueue(Job aJob) {
+               Hashtable queue = getQueue(aJob.getState());
+               addToQueue(aJob, queue);        
+       }
+       
+       private Hashtable getQueue(int state) {
+               Hashtable retv = null;
+               if (state == Job.WAITING) {
+                       retv = this.waitingJobs;
+               } else if (state == Job.READY) {
+                       retv = this.readyJobs;
+               } else if (state == Job.RUNNING) {
+                       retv = this.runningJobs;
+               } else if (state == Job.SUCCESS) {
+                       retv = this.successfulJobs;
+               } else if (state == Job.FAILED || state == 
Job.DEPENDENT_FAILED) {
+                       retv = this.failedJobs;
+               } 
+               return retv;
+                       
+       }
+
+       /**
+        * Add a new job.
+        * @param aJob the the new job
+        */
+       synchronized public String addJob(Job aJob) {
+               String id = this.getNextJobID();
+               aJob.setJobID(id);
+               aJob.setState(Job.WAITING);
+               this.addToQueue(aJob);
+               return id;      
+       }
+       
+       /**
+        * @param args
+        */
+       public static void main(String[] args) {
+               // TODO Auto-generated method stub
+
+       }
+
+       /**
+        * @return the thread state
+        */
+       public int getState() {
+               return this.runnerState;
+       }
+       
+       /**
+        * set the thread state to STOPPING so that the 
+        * thread will stop when it wakes up.
+        */
+       public void stop() {
+               this.runnerState = JobControl.STOPPING;
+       }
+       
+       /**
+        * suspend the running thread
+        */
+       public void suspend () {
+               if (this.runnerState == JobControl.RUNNING) {
+                       this.runnerState = JobControl.SUSPENDED;
+               }
+       }
+       
+       /**
+        * resume the suspended thread
+        */
+       public void resume () {
+               if (this.runnerState == JobControl.SUSPENDED) {
+                       this.runnerState = JobControl.RUNNING;
+               }
+       }
+       
+       synchronized private void checkRunningJobs() {
+               
+               Hashtable oldJobs = null;
+               oldJobs = this.runningJobs;
+               this.runningJobs = new Hashtable();
+               
+               Iterator jobs = oldJobs.values().iterator();
+               while (jobs.hasNext()) {
+                       Job nextJob = (Job)jobs.next();
+                       int state = nextJob.checkState();
+                       /*
+                       if (state != Job.RUNNING) {
+                               System.out.println("The state of the running 
job " +
+                                       nextJob.getJobName() + " has changed 
to: " + nextJob.getState());
+                       }
+                       */
+                       this.addToQueue(nextJob);
+               }
+       }
+       
+       synchronized private void checkWaitingJobs() {
+               Hashtable oldJobs = null;
+               oldJobs = this.waitingJobs;
+               this.waitingJobs = new Hashtable();
+               
+               Iterator jobs = oldJobs.values().iterator();
+               while (jobs.hasNext()) {
+                       Job nextJob = (Job)jobs.next();
+                       int state = nextJob.checkState();
+                       /*
+                       if (state != Job.WAITING) {
+                               System.out.println("The state of the waiting 
job " +
+                                       nextJob.getJobName() + " has changed 
to: " + nextJob.getState());
+                       }
+                       */
+                       this.addToQueue(nextJob);
+               }
+       }
+       
+       synchronized private void startReadyJobs() {
+               Hashtable oldJobs = null;
+               oldJobs = this.readyJobs;
+               this.readyJobs = new Hashtable();
+               
+               Iterator jobs = oldJobs.values().iterator();
+               while (jobs.hasNext()) {
+                       Job nextJob = (Job)jobs.next();
+                       //System.out.println("Job to submit to Hadoop: " + 
nextJob.getJobName());
+                       nextJob.submit();
+                       //System.out.println("Hadoop ID: " + 
nextJob.getMapredJobID());
+                       this.addToQueue(nextJob);
+               }       
+       }
+       
+       synchronized public boolean allFinished() {
+               return this.waitingJobs.size() == 0 &&
+                       this.readyJobs.size() == 0 &&
+                       this.runningJobs.size() == 0;
+       }
+       
+       /**
+        *  The main loop for the thread.
+        *  The loop does the following:
+        *      Check the states of the running jobs
+        *      Update the states of waiting jobs
+        *      Submit the jobs in ready state
+        */
+       public void run() {
+               this.runnerState = JobControl.RUNNING;
+               while (true) {
+                       while (this.runnerState == JobControl.SUSPENDED) {
+                               try {
+                                       Thread.sleep(5000);
+                               }
+                               catch (Exception e) {
+                                       
+                               }
+                       }
+                       checkRunningJobs();     
+                       checkWaitingJobs();             
+                       startReadyJobs();               
+                       if (this.runnerState != JobControl.RUNNING && 
+                                       this.runnerState != 
JobControl.SUSPENDED) {
+                               break;
+                       }
+                       try {
+                               Thread.sleep(5000);
+                       }
+                       catch (Exception e) {
+                               
+                       }
+                       if (this.runnerState != JobControl.RUNNING && 
+                                       this.runnerState != 
JobControl.SUSPENDED) {
+                               break;
+                       }
+               }
+               this.runnerState = JobControl.STOPPED;
+       }
+
+}

Added: 
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/package.html
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/package.html?rev=431715&view=auto
==============================================================================
--- 
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/package.html 
(added)
+++ 
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/package.html 
Tue Aug 15 14:55:52 2006
@@ -0,0 +1,7 @@
+<html>
+<body>
+
+<p>Utilities for managing dependent jobs.</p>
+
+</body>
+</html>

Added: 
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java?rev=431715&view=auto
==============================================================================
--- 
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java
 (added)
+++ 
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java
 Tue Aug 15 14:55:52 2006
@@ -0,0 +1,297 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.jobcontrol;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.Iterator;
+import java.util.ArrayList;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This class performs unit test for Job/JobControl classes.
+ *  
+ * @author runping
+ *
+ */
+public class TestJobControl extends junit.framework.TestCase {
+
+    private static NumberFormat idFormat = NumberFormat.getInstance();
+    static {
+        idFormat.setMinimumIntegerDigits(4);
+        idFormat.setGroupingUsed(false);
+    }
+
+    static private Random rand = new Random();
+
+    private static void cleanData(FileSystem fs, Path dirPath)
+            throws IOException {
+        fs.delete(dirPath);
+    }
+
+    private static String generateRandomWord() {
+        return idFormat.format(rand.nextLong());
+    }
+
+    private static String generateRandomLine() {
+        long r = rand.nextLong() % 7;
+        long n = r + 20;
+        StringBuffer sb = new StringBuffer();
+        for (int i = 0; i < n; i++) {
+            sb.append(generateRandomWord()).append(" ");
+        }
+        sb.append("\n");
+        return sb.toString();
+    }
+
+    private static void generateData(FileSystem fs, Path dirPath)
+            throws IOException {
+        FSDataOutputStream out = fs.create(new Path(dirPath, "data.txt"));
+        for (int i = 0; i < 100000; i++) {
+            String line = TestJobControl.generateRandomLine();
+            out.write(line.getBytes("UTF-8"));
+        }
+        out.close();
+    }
+
+    public static class DataCopy extends MapReduceBase implements Mapper,
+            Reducer {
+        public void map(WritableComparable key, Writable value,
+                OutputCollector output, Reporter reporter) throws IOException {
+            output.collect(new Text(key.toString()), value);
+        }
+
+        public void reduce(WritableComparable key, Iterator values,
+                OutputCollector output, Reporter reporter) throws IOException {
+            Text dumbKey = new Text("");
+            while (values.hasNext()) {
+                Text data = (Text) values.next();
+                output.collect(dumbKey, data);
+            }
+        }
+    }
+
+    private static JobConf createCopyJob(ArrayList indirs, Path outdir)
+            throws Exception {
+
+        Configuration defaults = new Configuration();
+        JobConf theJob = new JobConf(defaults, TestJobControl.class);
+        theJob.setJobName("DataMoveJob");
+
+        theJob.setInputPath((Path) indirs.get(0));
+        if (indirs.size() > 1) {
+            for (int i = 1; i < indirs.size(); i++) {
+                theJob.addInputPath((Path) indirs.get(i));
+            }
+        }
+        theJob.setMapperClass(DataCopy.class);
+        theJob.setOutputPath(outdir);
+        theJob.setOutputKeyClass(Text.class);
+        theJob.setOutputValueClass(Text.class);
+        theJob.setReducerClass(DataCopy.class);
+        theJob.setNumMapTasks(12);
+        theJob.setNumReduceTasks(4);
+        return theJob;
+    }
+
+    /**
+     * This is a main function for testing JobControl class.
+     * It first cleans all the dirs it will use. Then it generates some random 
text
+     * data in TestJobControlData/indir. Then it creates 4 jobs: 
+     *      Job 1: copy data from indir to outdir_1
+     *      Job 2: copy data from indir to outdir_2
+     *      Job 3: copy data from outdir_1 and outdir_2 to outdir_3
+     *      Job 4: copy data from outdir to outdir_4
+     * The jobs 1 and 2 have no dependency. The job 3 depends on jobs 1 and 2.
+     * The job 4 depends on job 3.
+     * 
+     * Then it creates a JobControl object and add the 4 jobs to the 
JobControl object.
+     * Finally, it creates a thread to run the JobControl object and 
monitors/reports
+     * the job states.
+     * 
+     * @param args
+     */
+    public static void doJobControlTest() throws Exception {
+        
+        Configuration defaults = new Configuration();
+        FileSystem fs = FileSystem.get(defaults);
+        Path rootDataDir = new Path(System.getProperty("test.build.data", 
"."), "TestJobControlData");
+        Path indir = new Path(rootDataDir, "indir");
+        Path outdir_1 = new Path(rootDataDir, "outdir_1");
+        Path outdir_2 = new Path(rootDataDir, "outdir_2");
+        Path outdir_3 = new Path(rootDataDir, "outdir_3");
+        Path outdir_4 = new Path(rootDataDir, "outdir_4");
+
+        cleanData(fs, indir);
+        generateData(fs, indir);
+
+        cleanData(fs, outdir_1);
+        cleanData(fs, outdir_2);
+        cleanData(fs, outdir_3);
+        cleanData(fs, outdir_4);
+
+        ArrayList dependingJobs = null;
+
+        ArrayList inPaths_1 = new ArrayList();
+        inPaths_1.add(indir);
+        JobConf jobConf_1 = createCopyJob(inPaths_1, outdir_1);
+        Job job_1 = new Job(jobConf_1, dependingJobs);
+        ArrayList inPaths_2 = new ArrayList();
+        inPaths_2.add(indir);
+        JobConf jobConf_2 = createCopyJob(inPaths_2, outdir_2);
+        Job job_2 = new Job(jobConf_2, dependingJobs);
+
+        ArrayList inPaths_3 = new ArrayList();
+        inPaths_3.add(outdir_1);
+        inPaths_3.add(outdir_2);
+        JobConf jobConf_3 = createCopyJob(inPaths_3, outdir_3);
+        dependingJobs = new ArrayList();
+        dependingJobs.add(job_1);
+        dependingJobs.add(job_2);
+        Job job_3 = new Job(jobConf_3, dependingJobs);
+
+        ArrayList inPaths_4 = new ArrayList();
+        inPaths_4.add(outdir_3);
+        JobConf jobConf_4 = createCopyJob(inPaths_4, outdir_4);
+        dependingJobs = new ArrayList();
+        dependingJobs.add(job_3);
+        Job job_4 = new Job(jobConf_4, dependingJobs);
+
+        JobControl theControl = new JobControl("Test");
+        theControl.addJob(job_1);
+        theControl.addJob(job_2);
+        theControl.addJob(job_3);
+        theControl.addJob(job_4);
+
+        Thread theController = new Thread(theControl);
+        theController.start();
+        while (!theControl.allFinished()) {
+
+            System.out.println("Jobs in waiting state: "
+                    + theControl.getWaitingJobs().size());
+            System.out.println("Jobs in ready state: "
+                    + theControl.getReadyJobs().size());
+            System.out.println("Jobs in running state: "
+                    + theControl.getRunningJobs().size());
+            System.out.println("Jobs in success state: "
+                    + theControl.getSuccessfulJobs().size());
+            System.out.println("Jobs in failed state: "
+                    + theControl.getFailedJobs().size());
+            System.out.println("\n");
+
+            try {
+                Thread.sleep(5000);
+            } catch (Exception e) {
+
+            }
+        }
+        System.out.println("Jobs are all done???");
+        System.out.println("Jobs in waiting state: "
+                + theControl.getWaitingJobs().size());
+        System.out.println("Jobs in ready state: "
+                + theControl.getReadyJobs().size());
+        System.out.println("Jobs in running state: "
+                + theControl.getRunningJobs().size());
+        System.out.println("Jobs in success state: "
+                + theControl.getSuccessfulJobs().size());
+        System.out.println("Jobs in failed state: "
+                + theControl.getFailedJobs().size());
+        System.out.println("\n");
+        
+        if (job_1.getState() != Job.FAILED && 
+                job_1.getState() != Job.DEPENDENT_FAILED && 
+                job_1.getState() != Job.SUCCESS) {
+           
+                String states = "job_1:  " + job_1.getState() + "\n";
+                throw new Exception("The state of job_1 is not in a complete 
state\n" + states);
+        }
+        
+        if (job_2.getState() != Job.FAILED &&
+                job_2.getState() != Job.DEPENDENT_FAILED && 
+                job_2.getState() != Job.SUCCESS) {
+           
+                String states = "job_2:  " + job_2.getState() + "\n";
+                throw new Exception("The state of job_2 is not in a complete 
state\n" + states);
+        }
+        
+        if (job_3.getState() != Job.FAILED && 
+                job_3.getState() != Job.DEPENDENT_FAILED && 
+                job_3.getState() != Job.SUCCESS) {
+           
+                String states = "job_3:  " + job_3.getState() + "\n";
+                throw new Exception("The state of job_3 is not in a complete 
state\n" + states);
+        }
+        if (job_4.getState() != Job.FAILED && 
+                job_4.getState() != Job.DEPENDENT_FAILED && 
+                job_4.getState() != Job.SUCCESS) {
+           
+                String states = "job_4:  " + job_4.getState() + "\n";
+                throw new Exception("The state of job_4 is not in a complete 
state\n" + states);
+        }
+        
+        if (job_1.getState() == Job.FAILED || 
+                job_2.getState() == Job.FAILED ||
+                job_1.getState() == Job.DEPENDENT_FAILED || 
+                job_2.getState() == Job.DEPENDENT_FAILED) {
+            if (job_3.getState() != Job.DEPENDENT_FAILED) {
+                String states = "job_1:  " + job_1.getState() + "\n";
+                states = "job_2:  " + job_2.getState() + "\n";
+                states = "job_3:  " + job_3.getState() + "\n";
+                states = "job_4:  " + job_4.getState() + "\n";
+                throw new Exception("The states of jobs 1, 2, 3, 4 are not 
consistent\n" + states);
+            }
+        }
+        if (job_3.getState() == Job.FAILED || 
+                job_3.getState() == Job.DEPENDENT_FAILED) {
+            if (job_4.getState() != Job.DEPENDENT_FAILED) {
+                String states = "job_3:  " + job_3.getState() + "\n";
+                states = "job_4:  " + job_4.getState() + "\n";
+                throw new Exception("The states of jobs 3, 4 are not 
consistent\n" + states);
+            }
+        }
+        
+        theControl.stop();
+    }
+
+    public void testJobControl() throws Exception {
+        doJobControlTest();
+    }
+    
+    public static void main(String[] args) {
+        TestJobControl test = new TestJobControl();
+        try {
+            test.testJobControl();
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}


Reply via email to