Author: cutting Date: Tue Aug 15 14:55:52 2006 New Revision: 431715 URL: http://svn.apache.org/viewvc?rev=431715&view=rev Log: HADOOP-322. Add a job control utility. Contributed by Runping.
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/package.html lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java Modified: lucene/hadoop/trunk/CHANGES.txt Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=431715&r1=431714&r2=431715&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Tue Aug 15 14:55:52 2006 @@ -46,6 +46,11 @@ critical percentage of the datanodes are unavailable. (Konstantin Shvachko via cutting) +11. HADOOP-322. Add a job control utility. This permits one to + specify job interdependencies. Each job is submitted only after + the jobs it depends on have successfully completed. + (Runping Qi via cutting) + Release 0.5.0 - 2006-08-04 Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java?rev=431715&view=auto ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java Tue Aug 15 14:55:52 2006 @@ -0,0 +1,331 @@ +/** + * Copyright 2005 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.jobcontrol; + + +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.util.StringUtils; + +import java.util.ArrayList; +import java.io.IOException; + +/** This class encapsulates a MapReduce job and its dependency. It monitors + * the states of the depending jobs and updates the state of this job. + * A job stats in the WAITING state. If it does not have any deoending jobs, or + * all of the depending jobs are in SUCCESS state, then the job state will become + * READY. If any depending jobs fail, the job will fail too. + * When in READY state, the job can be submitted to Hadoop for execution, with + * the state changing into RUNNING state. From RUNNING state, the job can get into + * SUCCESS or FAILED state, depending the status of the jon execution. + * + */ + +public class Job { + + // A job will be in one of the following states + final public static int SUCCESS = 0; + final public static int WAITING = 1; + final public static int RUNNING = 2; + final public static int READY = 3; + final public static int FAILED = 4; + final public static int DEPENDENT_FAILED = 5; + + + private JobConf theJobConf; + private int state; + private String jobID; // assigned and used by JobControl class + private String mapredJobID; // the job ID assigned by map/reduce + private String jobName; // external name, assigned/used by client app + private String message; // some info for human consumption, + // e.g. the reason why the job failed + private ArrayList dependingJobs; // the jobs the current job depends on + + private JobClient jc = null; // the map reduce job client + + /** + * Construct a job. + * @param jobConf a mapred job configuration representing a job to be executed. + * @param dependingJobs an array of jobs the current job depends on + */ + public Job(JobConf jobConf, ArrayList dependingJobs) throws IOException { + this.theJobConf = jobConf; + this.dependingJobs = dependingJobs; + this.state = Job.WAITING; + this.jobID = "unassigned"; + this.mapredJobID = "unassigned"; + this.jobName = "unassigned"; + this.message = "just initialized"; + this.jc = new JobClient(jobConf); + } + + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("job name:\t").append(this.jobName).append("\n"); + sb.append("job id:\t").append(this.jobID).append("\n"); + sb.append("job state:\t").append(this.state).append("\n"); + sb.append("job mapred id:\t").append(this.mapredJobID).append("\n"); + sb.append("job message:\t").append(this.message).append("\n"); + + if (this.dependingJobs == null) { + sb.append("job has no depending job:\t").append("\n"); + } else { + sb.append("job has ").append(this.dependingJobs.size()).append(" dependeng jobs:\n"); + for (int i = 0; i < this.dependingJobs.size(); i++) { + sb.append("\t depending job ").append(i).append(":\t"); + sb.append(((Job) this.dependingJobs.get(i)).getJobName()).append("\n"); + } + } + return sb.toString(); + } + + /** + * @return the job name of this job + */ + public String getJobName() { + return this.jobName; + } + + /** + * Set the job name for this job. + * @param jobName the job name + */ + public void setJobName(String jobName) { + this.jobName = jobName; + } + + /** + * @return the job ID of this job + */ + public String getJobID() { + return this.jobID; + } + + /** + * Set the job ID for this job. + * @param id the job ID + */ + public void setJobID(String id) { + this.jobID = id; + } + + /** + * @return the mapred ID of this job + */ + public String getMapredJobID() { + return this.mapredJobID; + } + + /** + * Set the mapred ID for this job. + * @param mapredJobID the mapred job ID for this job. + */ + public void setMapredJobID(String mapredJobID) { + this.jobID = mapredJobID; + } + + /** + * @return the mapred job conf of this job + */ + public JobConf getJobConf() { + return this.theJobConf; + } + + + /** + * Set the mapred job conf for this job. + * @param jobConf the mapred job conf for this job. + */ + public void setJobConf(JobConf jobConf) { + this.theJobConf = jobConf; + } + + /** + * @return the state of this job + */ + public int getState() { + return this.state; + } + + /** + * Set the state for this job. + * @param state the new state for this job. + */ + public void setState(int state) { + this.state = state; + } + + /** + * @return the message of this job + */ + public String getMessage() { + return this.message; + } + + /** + * Set the message for this job. + * @param message the message for this job. + */ + public void setMessage(String message) { + this.message = message; + } + + /** + * @return the depending jobs of this job + */ + public ArrayList getDependingJobs() { + return this.dependingJobs; + } + + /** + * @return true if this job is in a complete state + */ + public boolean isCompleted() { + return this.state == Job.FAILED || + this.state == Job.DEPENDENT_FAILED || + this.state == Job.SUCCESS; + } + + /** + * @return true if this job is in READY state + */ + public boolean isReady() { + return this.state == Job.READY; + } + + /** + * Check the state of this running job. The state may + * remain the same, become SUCCESS or FAILED. + */ + private void checkRunningState() { + RunningJob running = null; + try { + running = jc.getJob(this.mapredJobID); + if (running.isComplete()) { + if (running.isSuccessful()) { + this.state = Job.SUCCESS; + } else { + this.state = Job.FAILED; + this.message = "Job failed!"; + try { + running.killJob(); + } catch (IOException e1) { + + } + try { + this.jc.close(); + } catch (IOException e2) { + + } + } + } + + } catch (IOException ioe) { + this.state = Job.FAILED; + this.message = StringUtils.stringifyException(ioe); + try { + running.killJob(); + } catch (IOException e1) { + + } + try { + this.jc.close(); + } catch (IOException e1) { + + } + } + } + + /** + * Check and update the state of this job. The state changes + * depending on its current state and the states of the depending jobs. + */ + public int checkState() { + if (this.state == Job.RUNNING) { + checkRunningState(); + } + if (this.state != Job.WAITING) { + return this.state; + } + if (this.dependingJobs == null || this.dependingJobs.size() == 0) { + this.state = Job.READY; + return this.state; + } + Job pred = null; + int n = this.dependingJobs.size(); + for (int i = 0; i < n; i++) { + pred = (Job) this.dependingJobs.get(i); + int s = pred.checkState(); + if (s == Job.WAITING || s == Job.READY || s == Job.RUNNING) { + break; // a pred is still not completed, continue in WAITING + // state + } + if (s == Job.FAILED || s == Job.DEPENDENT_FAILED) { + this.state = Job.DEPENDENT_FAILED; + this.message = "depending job " + i + " with jobID " + + pred.getJobID() + " failed. " + pred.getMessage(); + break; + } + // pred must be in success state + if (i == n - 1) { + this.state = Job.READY; + } + } + + return this.state; + } + + /** + * Submit this job to mapred. The state becomes RUNNING if submission + * is successful, FAILED otherwise. + */ + public void submit() { + try { + if (theJobConf.getBoolean("create.empty.dir.if.nonexist", false)) { + FileSystem fs = FileSystem.get(theJobConf); + Path inputPaths[] = theJobConf.getInputPaths(); + for (int i = 0; i < inputPaths.length; i++) { + if (!fs.exists(inputPaths[i])) { + try { + fs.mkdirs(inputPaths[i]); + } catch (IOException e) { + + } + } + } + } + RunningJob running = jc.submitJob(theJobConf); + this.mapredJobID = running.getJobID(); + this.state = Job.RUNNING; + } catch (IOException ioe) { + this.state = Job.FAILED; + this.message = StringUtils.stringifyException(ioe); + } + } + + /** + * @param args + */ + public static void main(String[] args) { + // TODO Auto-generated method stub + + } + +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java?rev=431715&view=auto ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java Tue Aug 15 14:55:52 2006 @@ -0,0 +1,299 @@ +/** + * Copyright 2005 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.jobcontrol; + +import java.util.ArrayList; +import java.util.Hashtable; +import java.util.Iterator; + +/** This class encapsulates a set of MapReduce jobs and its dependency. It tracks + * the states of the jobs by placing them into different tables according to their + * states. + * + * This class provides APIs for the client app to add a job to the group and to get + * the jobs in the group in different states. When a + * job is added, an ID unique to the group is assigned to the job. + * + * This class has a thread that submits jobs when they become ready, monitors the + * states of the running jobs, and updates the states of jobs based on the state changes + * of their depending jobs states. The class provides APIs for suspending/resuming + * the thread,and for stopping the thread. + * + */ +public class JobControl implements Runnable{ + + // The thread can be in one of the following state + private static final int RUNNING = 0; + private static final int SUSPENDED = 1; + private static final int STOPPED = 2; + private static final int STOPPING = 3; + private static final int READY = 4; + + private int runnerState; // the thread state + + private Hashtable waitingJobs; + private Hashtable readyJobs; + private Hashtable runningJobs; + private Hashtable successfulJobs; + private Hashtable failedJobs; + + private long nextJobID; + private String groupName; + + /** + * Construct a job control for a group of jobs. + * @param groupName a name identifying this group + */ + public JobControl(String groupName) { + this.waitingJobs = new Hashtable(); + this.readyJobs = new Hashtable(); + this.runningJobs = new Hashtable(); + this.successfulJobs = new Hashtable(); + this.failedJobs = new Hashtable(); + this.nextJobID = -1; + this.groupName = groupName; + this.runnerState = JobControl.READY; + + } + + private static ArrayList toArrayList(Hashtable jobs) { + ArrayList retv = new ArrayList(); + Iterator iter = jobs.values().iterator(); + while (iter.hasNext()) { + retv.add(iter.next()); + } + return retv; + } + + /** + * @return the jobs in the waiting state + */ + public ArrayList getWaitingJobs() { + return JobControl.toArrayList(this.waitingJobs); + } + + /** + * @return the jobs in the running state + */ + public ArrayList getRunningJobs() { + return JobControl.toArrayList(this.runningJobs); + } + + /** + * @return the jobs in the ready state + */ + public ArrayList getReadyJobs() { + return JobControl.toArrayList(this.readyJobs); + } + + /** + * @return the jobs in the success state + */ + public ArrayList getSuccessfulJobs() { + return JobControl.toArrayList(this.successfulJobs); + } + + public ArrayList getFailedJobs() { + return JobControl.toArrayList(this.failedJobs); + } + + private String getNextJobID() { + nextJobID += 1; + return this.groupName + this.nextJobID; + } + + private static void addToQueue(Job aJob, Hashtable queue) { + synchronized(queue) { + queue.put(aJob.getJobID(), aJob); + } + } + + private void addToQueue(Job aJob) { + Hashtable queue = getQueue(aJob.getState()); + addToQueue(aJob, queue); + } + + private Hashtable getQueue(int state) { + Hashtable retv = null; + if (state == Job.WAITING) { + retv = this.waitingJobs; + } else if (state == Job.READY) { + retv = this.readyJobs; + } else if (state == Job.RUNNING) { + retv = this.runningJobs; + } else if (state == Job.SUCCESS) { + retv = this.successfulJobs; + } else if (state == Job.FAILED || state == Job.DEPENDENT_FAILED) { + retv = this.failedJobs; + } + return retv; + + } + + /** + * Add a new job. + * @param aJob the the new job + */ + synchronized public String addJob(Job aJob) { + String id = this.getNextJobID(); + aJob.setJobID(id); + aJob.setState(Job.WAITING); + this.addToQueue(aJob); + return id; + } + + /** + * @param args + */ + public static void main(String[] args) { + // TODO Auto-generated method stub + + } + + /** + * @return the thread state + */ + public int getState() { + return this.runnerState; + } + + /** + * set the thread state to STOPPING so that the + * thread will stop when it wakes up. + */ + public void stop() { + this.runnerState = JobControl.STOPPING; + } + + /** + * suspend the running thread + */ + public void suspend () { + if (this.runnerState == JobControl.RUNNING) { + this.runnerState = JobControl.SUSPENDED; + } + } + + /** + * resume the suspended thread + */ + public void resume () { + if (this.runnerState == JobControl.SUSPENDED) { + this.runnerState = JobControl.RUNNING; + } + } + + synchronized private void checkRunningJobs() { + + Hashtable oldJobs = null; + oldJobs = this.runningJobs; + this.runningJobs = new Hashtable(); + + Iterator jobs = oldJobs.values().iterator(); + while (jobs.hasNext()) { + Job nextJob = (Job)jobs.next(); + int state = nextJob.checkState(); + /* + if (state != Job.RUNNING) { + System.out.println("The state of the running job " + + nextJob.getJobName() + " has changed to: " + nextJob.getState()); + } + */ + this.addToQueue(nextJob); + } + } + + synchronized private void checkWaitingJobs() { + Hashtable oldJobs = null; + oldJobs = this.waitingJobs; + this.waitingJobs = new Hashtable(); + + Iterator jobs = oldJobs.values().iterator(); + while (jobs.hasNext()) { + Job nextJob = (Job)jobs.next(); + int state = nextJob.checkState(); + /* + if (state != Job.WAITING) { + System.out.println("The state of the waiting job " + + nextJob.getJobName() + " has changed to: " + nextJob.getState()); + } + */ + this.addToQueue(nextJob); + } + } + + synchronized private void startReadyJobs() { + Hashtable oldJobs = null; + oldJobs = this.readyJobs; + this.readyJobs = new Hashtable(); + + Iterator jobs = oldJobs.values().iterator(); + while (jobs.hasNext()) { + Job nextJob = (Job)jobs.next(); + //System.out.println("Job to submit to Hadoop: " + nextJob.getJobName()); + nextJob.submit(); + //System.out.println("Hadoop ID: " + nextJob.getMapredJobID()); + this.addToQueue(nextJob); + } + } + + synchronized public boolean allFinished() { + return this.waitingJobs.size() == 0 && + this.readyJobs.size() == 0 && + this.runningJobs.size() == 0; + } + + /** + * The main loop for the thread. + * The loop does the following: + * Check the states of the running jobs + * Update the states of waiting jobs + * Submit the jobs in ready state + */ + public void run() { + this.runnerState = JobControl.RUNNING; + while (true) { + while (this.runnerState == JobControl.SUSPENDED) { + try { + Thread.sleep(5000); + } + catch (Exception e) { + + } + } + checkRunningJobs(); + checkWaitingJobs(); + startReadyJobs(); + if (this.runnerState != JobControl.RUNNING && + this.runnerState != JobControl.SUSPENDED) { + break; + } + try { + Thread.sleep(5000); + } + catch (Exception e) { + + } + if (this.runnerState != JobControl.RUNNING && + this.runnerState != JobControl.SUSPENDED) { + break; + } + } + this.runnerState = JobControl.STOPPED; + } + +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/package.html URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/package.html?rev=431715&view=auto ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/package.html (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/package.html Tue Aug 15 14:55:52 2006 @@ -0,0 +1,7 @@ +<html> +<body> + +<p>Utilities for managing dependent jobs.</p> + +</body> +</html> Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java?rev=431715&view=auto ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java Tue Aug 15 14:55:52 2006 @@ -0,0 +1,297 @@ +/** + * Copyright 2005 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.jobcontrol; + +import java.io.IOException; +import java.text.NumberFormat; +import java.util.Iterator; +import java.util.ArrayList; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; + +/** + * This class performs unit test for Job/JobControl classes. + * + * @author runping + * + */ +public class TestJobControl extends junit.framework.TestCase { + + private static NumberFormat idFormat = NumberFormat.getInstance(); + static { + idFormat.setMinimumIntegerDigits(4); + idFormat.setGroupingUsed(false); + } + + static private Random rand = new Random(); + + private static void cleanData(FileSystem fs, Path dirPath) + throws IOException { + fs.delete(dirPath); + } + + private static String generateRandomWord() { + return idFormat.format(rand.nextLong()); + } + + private static String generateRandomLine() { + long r = rand.nextLong() % 7; + long n = r + 20; + StringBuffer sb = new StringBuffer(); + for (int i = 0; i < n; i++) { + sb.append(generateRandomWord()).append(" "); + } + sb.append("\n"); + return sb.toString(); + } + + private static void generateData(FileSystem fs, Path dirPath) + throws IOException { + FSDataOutputStream out = fs.create(new Path(dirPath, "data.txt")); + for (int i = 0; i < 100000; i++) { + String line = TestJobControl.generateRandomLine(); + out.write(line.getBytes("UTF-8")); + } + out.close(); + } + + public static class DataCopy extends MapReduceBase implements Mapper, + Reducer { + public void map(WritableComparable key, Writable value, + OutputCollector output, Reporter reporter) throws IOException { + output.collect(new Text(key.toString()), value); + } + + public void reduce(WritableComparable key, Iterator values, + OutputCollector output, Reporter reporter) throws IOException { + Text dumbKey = new Text(""); + while (values.hasNext()) { + Text data = (Text) values.next(); + output.collect(dumbKey, data); + } + } + } + + private static JobConf createCopyJob(ArrayList indirs, Path outdir) + throws Exception { + + Configuration defaults = new Configuration(); + JobConf theJob = new JobConf(defaults, TestJobControl.class); + theJob.setJobName("DataMoveJob"); + + theJob.setInputPath((Path) indirs.get(0)); + if (indirs.size() > 1) { + for (int i = 1; i < indirs.size(); i++) { + theJob.addInputPath((Path) indirs.get(i)); + } + } + theJob.setMapperClass(DataCopy.class); + theJob.setOutputPath(outdir); + theJob.setOutputKeyClass(Text.class); + theJob.setOutputValueClass(Text.class); + theJob.setReducerClass(DataCopy.class); + theJob.setNumMapTasks(12); + theJob.setNumReduceTasks(4); + return theJob; + } + + /** + * This is a main function for testing JobControl class. + * It first cleans all the dirs it will use. Then it generates some random text + * data in TestJobControlData/indir. Then it creates 4 jobs: + * Job 1: copy data from indir to outdir_1 + * Job 2: copy data from indir to outdir_2 + * Job 3: copy data from outdir_1 and outdir_2 to outdir_3 + * Job 4: copy data from outdir to outdir_4 + * The jobs 1 and 2 have no dependency. The job 3 depends on jobs 1 and 2. + * The job 4 depends on job 3. + * + * Then it creates a JobControl object and add the 4 jobs to the JobControl object. + * Finally, it creates a thread to run the JobControl object and monitors/reports + * the job states. + * + * @param args + */ + public static void doJobControlTest() throws Exception { + + Configuration defaults = new Configuration(); + FileSystem fs = FileSystem.get(defaults); + Path rootDataDir = new Path(System.getProperty("test.build.data", "."), "TestJobControlData"); + Path indir = new Path(rootDataDir, "indir"); + Path outdir_1 = new Path(rootDataDir, "outdir_1"); + Path outdir_2 = new Path(rootDataDir, "outdir_2"); + Path outdir_3 = new Path(rootDataDir, "outdir_3"); + Path outdir_4 = new Path(rootDataDir, "outdir_4"); + + cleanData(fs, indir); + generateData(fs, indir); + + cleanData(fs, outdir_1); + cleanData(fs, outdir_2); + cleanData(fs, outdir_3); + cleanData(fs, outdir_4); + + ArrayList dependingJobs = null; + + ArrayList inPaths_1 = new ArrayList(); + inPaths_1.add(indir); + JobConf jobConf_1 = createCopyJob(inPaths_1, outdir_1); + Job job_1 = new Job(jobConf_1, dependingJobs); + ArrayList inPaths_2 = new ArrayList(); + inPaths_2.add(indir); + JobConf jobConf_2 = createCopyJob(inPaths_2, outdir_2); + Job job_2 = new Job(jobConf_2, dependingJobs); + + ArrayList inPaths_3 = new ArrayList(); + inPaths_3.add(outdir_1); + inPaths_3.add(outdir_2); + JobConf jobConf_3 = createCopyJob(inPaths_3, outdir_3); + dependingJobs = new ArrayList(); + dependingJobs.add(job_1); + dependingJobs.add(job_2); + Job job_3 = new Job(jobConf_3, dependingJobs); + + ArrayList inPaths_4 = new ArrayList(); + inPaths_4.add(outdir_3); + JobConf jobConf_4 = createCopyJob(inPaths_4, outdir_4); + dependingJobs = new ArrayList(); + dependingJobs.add(job_3); + Job job_4 = new Job(jobConf_4, dependingJobs); + + JobControl theControl = new JobControl("Test"); + theControl.addJob(job_1); + theControl.addJob(job_2); + theControl.addJob(job_3); + theControl.addJob(job_4); + + Thread theController = new Thread(theControl); + theController.start(); + while (!theControl.allFinished()) { + + System.out.println("Jobs in waiting state: " + + theControl.getWaitingJobs().size()); + System.out.println("Jobs in ready state: " + + theControl.getReadyJobs().size()); + System.out.println("Jobs in running state: " + + theControl.getRunningJobs().size()); + System.out.println("Jobs in success state: " + + theControl.getSuccessfulJobs().size()); + System.out.println("Jobs in failed state: " + + theControl.getFailedJobs().size()); + System.out.println("\n"); + + try { + Thread.sleep(5000); + } catch (Exception e) { + + } + } + System.out.println("Jobs are all done???"); + System.out.println("Jobs in waiting state: " + + theControl.getWaitingJobs().size()); + System.out.println("Jobs in ready state: " + + theControl.getReadyJobs().size()); + System.out.println("Jobs in running state: " + + theControl.getRunningJobs().size()); + System.out.println("Jobs in success state: " + + theControl.getSuccessfulJobs().size()); + System.out.println("Jobs in failed state: " + + theControl.getFailedJobs().size()); + System.out.println("\n"); + + if (job_1.getState() != Job.FAILED && + job_1.getState() != Job.DEPENDENT_FAILED && + job_1.getState() != Job.SUCCESS) { + + String states = "job_1: " + job_1.getState() + "\n"; + throw new Exception("The state of job_1 is not in a complete state\n" + states); + } + + if (job_2.getState() != Job.FAILED && + job_2.getState() != Job.DEPENDENT_FAILED && + job_2.getState() != Job.SUCCESS) { + + String states = "job_2: " + job_2.getState() + "\n"; + throw new Exception("The state of job_2 is not in a complete state\n" + states); + } + + if (job_3.getState() != Job.FAILED && + job_3.getState() != Job.DEPENDENT_FAILED && + job_3.getState() != Job.SUCCESS) { + + String states = "job_3: " + job_3.getState() + "\n"; + throw new Exception("The state of job_3 is not in a complete state\n" + states); + } + if (job_4.getState() != Job.FAILED && + job_4.getState() != Job.DEPENDENT_FAILED && + job_4.getState() != Job.SUCCESS) { + + String states = "job_4: " + job_4.getState() + "\n"; + throw new Exception("The state of job_4 is not in a complete state\n" + states); + } + + if (job_1.getState() == Job.FAILED || + job_2.getState() == Job.FAILED || + job_1.getState() == Job.DEPENDENT_FAILED || + job_2.getState() == Job.DEPENDENT_FAILED) { + if (job_3.getState() != Job.DEPENDENT_FAILED) { + String states = "job_1: " + job_1.getState() + "\n"; + states = "job_2: " + job_2.getState() + "\n"; + states = "job_3: " + job_3.getState() + "\n"; + states = "job_4: " + job_4.getState() + "\n"; + throw new Exception("The states of jobs 1, 2, 3, 4 are not consistent\n" + states); + } + } + if (job_3.getState() == Job.FAILED || + job_3.getState() == Job.DEPENDENT_FAILED) { + if (job_4.getState() != Job.DEPENDENT_FAILED) { + String states = "job_3: " + job_3.getState() + "\n"; + states = "job_4: " + job_4.getState() + "\n"; + throw new Exception("The states of jobs 3, 4 are not consistent\n" + states); + } + } + + theControl.stop(); + } + + public void testJobControl() throws Exception { + doJobControlTest(); + } + + public static void main(String[] args) { + TestJobControl test = new TestJobControl(); + try { + test.testJobControl(); + } + catch (Exception e) { + e.printStackTrace(); + } + } +}