Author: acmurthy Date: Sat Jan 12 18:49:01 2008 New Revision: 611527 URL: http://svn.apache.org/viewvc?rev=611527&view=rev Log: HADOOP-1876. Persist statuses of completed jobs in HDFS so that the JobClient can query and get information about decommissioned jobs and also across JobTracker restarts. Contributed by Alejandro Abdelnur.
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/conf/hadoop-default.xml lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=611527&r1=611526&r2=611527&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Sat Jan 12 18:49:01 2008 @@ -217,6 +217,16 @@ HADOOP-2464. Unit tests for chmod, chown, and chgrp using DFS. (Raghu Angadi) + HADOOP-1876. Persist statuses of completed jobs in HDFS so that the + JobClient can query and get information about decommissioned jobs and also + across JobTracker restarts. + Configuration changes to hadoop-default.xml: + add mapred.job.tracker.persist.jobstatus.active (default value of false) + add mapred.job.tracker.persist.jobstatus.hours (default value of 0) + add mapred.job.tracker.persist.jobstatus.dir (default value of + /jobtracker/jobsInfo) + (Alejandro Abdelnur via acmurthy) + OPTIMIZATIONS HADOOP-1898. Release the lock protecting the last time of the last stack Modified: lucene/hadoop/trunk/conf/hadoop-default.xml URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?rev=611527&r1=611526&r2=611527&view=diff ============================================================================== --- lucene/hadoop/trunk/conf/hadoop-default.xml (original) +++ lucene/hadoop/trunk/conf/hadoop-default.xml Sat Jan 12 18:49:01 2008 @@ -893,6 +893,33 @@ </description> </property> + <property> + <name>mapred.job.tracker.persist.jobstatus.active</name> + <value>false</value> + <description>Indicates if persistency of job status information is + active or not. + </description> + </property> + + <property> + <name>mapred.job.tracker.persist.jobstatus.hours</name> + <value>0</value> + <description>The number of hours job status information is persisted in DFS. + The job status information will be available after it drops of the memory + queue and between jobtracker restarts. With a zero value the job status + information is not persisted at all in DFS. + </description> +</property> + + <property> + <name>mapred.job.tracker.persist.jobstatus.dir</name> + <value>/jobtracker/jobsInfo</value> + <description>The directory where the job status information is persisted + in a file system to be available after it drops of the memory queue and + between jobtracker restarts. + </description> + </property> + <!-- ipc properties --> <property> Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java?rev=611527&view=auto ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java Sat Jan 12 18:49:01 2008 @@ -0,0 +1,312 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; + +import java.io.IOException; + +/** + * Persists and retrieves the Job info of a job into/from DFS. + * <p/> + * If the retain time is zero jobs are not persisted. + * <p/> + * A daemon thread cleans up job info files older than the retain time + * <p/> + * The retain time can be set with the 'persist.jobstatus.hours' + * configuration variable (it is in hours). + */ +public class CompletedJobStatusStore implements Runnable { + private boolean active; + private String jobInfoDir; + private long retainTime; + private FileSystem fs; + private static final String JOB_INFO_STORE_DIR = "/jobtracker/jobsInfo"; + + public static final Log LOG = + LogFactory.getLog(CompletedJobStatusStore.class); + + private static long HOUR = 1000 * 60 * 60; + private static long SLEEP_TIME = 1 * HOUR; + + CompletedJobStatusStore(Configuration conf) throws IOException { + active = + conf.getBoolean("mapred.job.tracker.persist.jobstatus.active", false); + + if (active) { + fs = FileSystem.get(conf); + retainTime = + conf.getInt("mapred.job.tracker.persist.jobstatus.hours", 0) * HOUR; + + jobInfoDir = + conf.get("mapred.job.tracker.persist.jobstatus.dir", JOB_INFO_STORE_DIR); + + Path path = new Path(jobInfoDir); + if (!fs.exists(path)) { + fs.mkdirs(path); + } + + if (retainTime == 0) { + // as retain time is zero, all stored jobstatuses are deleted. + deleteJobStatusDirs(); + } + } + } + + /** + * Indicates if job status persistency is active or not. + * + * @return TRUE if active, FALSE otherwise. + */ + public boolean isActive() { + return active; + } + + public void run() { + if (retainTime > 0) { + while (true) { + deleteJobStatusDirs(); + try { + Thread.sleep(SLEEP_TIME); + } + catch (InterruptedException ex) { + break; + } + } + } + } + + private void deleteJobStatusDirs() { + try { + long currentTime = System.currentTimeMillis(); + Path[] jobInfoFiles = fs.listPaths( + new Path[]{new Path(jobInfoDir)}); + + //noinspection ForLoopReplaceableByForEach + for (Path jobInfo : jobInfoFiles) { + try { + FileStatus status = fs.getFileStatus(jobInfo); + if ((currentTime - status.getModificationTime()) > retainTime) { + fs.delete(jobInfo); + } + } + catch (IOException ie) { + LOG.warn("Could not do housekeeping for [ " + + jobInfo + "] job info : " + ie.getMessage(), ie); + } + } + } + catch (IOException ie) { + LOG.warn("Could not obtain job info files : " + ie.getMessage(), ie); + } + } + + private Path getInfoFilePath(String jobId) { + return new Path(jobInfoDir, jobId + ".info"); + } + + /** + * Persists a job in DFS. + * + * @param job the job about to be 'retired' + */ + public void store(JobInProgress job) { + if (active && retainTime > 0) { + String jobId = job.getStatus().getJobId(); + Path jobStatusFile = getInfoFilePath(jobId); + try { + FSDataOutputStream dataOut = fs.create(jobStatusFile); + + job.getStatus().write(dataOut); + + job.getProfile().write(dataOut); + + job.getCounters().write(dataOut); + + TaskCompletionEvent[] events = + job.getTaskCompletionEvents(0, Integer.MAX_VALUE); + dataOut.writeInt(events.length); + for (TaskCompletionEvent event : events) { + event.write(dataOut); + } + + dataOut.close(); + } catch (IOException ex) { + LOG.warn("Could not store [" + jobId + "] job info : " + + ex.getMessage(), ex); + try { + fs.delete(jobStatusFile); + } + catch (IOException ex1) { + //ignore + } + } + } + } + + private FSDataInputStream getJobInfoFile(String jobId) throws IOException { + Path jobStatusFile = getInfoFilePath(jobId); + return (fs.exists(jobStatusFile)) ? fs.open(jobStatusFile) : null; + } + + private JobStatus readJobStatus(FSDataInputStream dataIn) throws IOException { + JobStatus jobStatus = new JobStatus(); + jobStatus.readFields(dataIn); + return jobStatus; + } + + private JobProfile readJobProfile(FSDataInputStream dataIn) + throws IOException { + JobProfile jobProfile = new JobProfile(); + jobProfile.readFields(dataIn); + return jobProfile; + } + + private Counters readCounters(FSDataInputStream dataIn) throws IOException { + Counters counters = new Counters(); + counters.readFields(dataIn); + return counters; + } + + private TaskCompletionEvent[] readEvents(FSDataInputStream dataIn, + int offset, int len) + throws IOException { + int size = dataIn.readInt(); + if (offset > size) { + return TaskCompletionEvent.EMPTY_ARRAY; + } + if (offset + len > size) { + len = size - offset; + } + TaskCompletionEvent[] events = new TaskCompletionEvent[len]; + for (int i = 0; i < (offset + len); i++) { + TaskCompletionEvent event = new TaskCompletionEvent(); + event.readFields(dataIn); + if (i >= offset) { + events[i - offset] = event; + } + } + return events; + } + + /** + * This method retrieves JobStatus information from DFS stored using + * store method. + * + * @param jobId the jobId for which jobStatus is queried + * @return JobStatus object, null if not able to retrieve + */ + public JobStatus readJobStatus(String jobId) { + JobStatus jobStatus = null; + if (active) { + try { + FSDataInputStream dataIn = getJobInfoFile(jobId); + if (dataIn != null) { + jobStatus = readJobStatus(dataIn); + dataIn.close(); + } + } catch (IOException ex) { + LOG.warn("Could not read [" + jobId + "] job status : " + ex, ex); + } + } + return jobStatus; + } + + /** + * This method retrieves JobProfile information from DFS stored using + * store method. + * + * @param jobId the jobId for which jobProfile is queried + * @return JobProfile object, null if not able to retrieve + */ + public JobProfile readJobProfile(String jobId) { + JobProfile jobProfile = null; + if (active) { + try { + FSDataInputStream dataIn = getJobInfoFile(jobId); + if (dataIn != null) { + readJobStatus(dataIn); + jobProfile = readJobProfile(dataIn); + dataIn.close(); + } + } catch (IOException ex) { + LOG.warn("Could not read [" + jobId + "] job profile : " + ex, ex); + } + } + return jobProfile; + } + + /** + * This method retrieves Counters information from DFS stored using + * store method. + * + * @param jobId the jobId for which Counters is queried + * @return Counters object, null if not able to retrieve + */ + public Counters readCounters(String jobId) { + Counters counters = null; + if (active) { + try { + FSDataInputStream dataIn = getJobInfoFile(jobId); + if (dataIn != null) { + readJobStatus(dataIn); + readJobProfile(dataIn); + counters = readCounters(dataIn); + dataIn.close(); + } + } catch (IOException ex) { + LOG.warn("Could not read [" + jobId + "] job counters : " + ex, ex); + } + } + return counters; + } + + /** + * This method retrieves TaskCompletionEvents information from DFS stored + * using store method. + * + * @param jobId the jobId for which TaskCompletionEvents is queried + * @param fromEventId events offset + * @param maxEvents max number of events + * @return TaskCompletionEvent[], empty array if not able to retrieve + */ + public TaskCompletionEvent[] readJobTaskCompletionEvents(String jobId, + int fromEventId, + int maxEvents) { + TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY; + if (active) { + try { + FSDataInputStream dataIn = getJobInfoFile(jobId); + if (dataIn != null) { + readJobStatus(dataIn); + readJobProfile(dataIn); + readCounters(dataIn); + events = readEvents(dataIn, fromEventId, maxEvents); + dataIn.close(); + } + } catch (IOException ex) { + LOG.warn("Could not read [" + jobId + "] job events : " + ex, ex); + } + } + return events; + } + +} Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=611527&r1=611526&r2=611527&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Sat Jan 12 18:49:01 2008 @@ -563,7 +563,10 @@ ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks(); Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks, "expireLaunchingTasks"); - + + CompletedJobStatusStore completedJobStatusStore = null; + Thread completedJobsStoreThread = null; + /** * It might seem like a bug to maintain a TreeSet of status objects, * which can be updated at any time. But that's not what happens! We @@ -701,6 +704,10 @@ synchronized (this) { state = State.RUNNING; } + + //initializes the job status store + completedJobStatusStore = new CompletedJobStatusStore(conf); + LOG.info("Starting RUNNING"); } @@ -726,6 +733,12 @@ this.taskCommitThread = new TaskCommitQueue(); this.taskCommitThread.start(); + if (completedJobStatusStore.isActive()) { + completedJobsStoreThread = new Thread(completedJobStatusStore, + "completedjobsStore-housekeeper"); + completedJobsStoreThread.start(); + } + this.interTrackerServer.join(); LOG.info("Stopped interTrackerServer"); } @@ -788,6 +801,16 @@ ex.printStackTrace(); } } + if (this.completedJobsStoreThread != null && + this.completedJobsStoreThread.isAlive()) { + LOG.info("Stopping completedJobsStore thread"); + this.completedJobsStoreThread.interrupt(); + try { + this.completedJobsStoreThread.join(); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + } LOG.info("stopped all jobtracker services"); return; } @@ -895,7 +918,7 @@ LOG.info("Removed completed task '" + taskid + "' from '" + taskTracker + "'"); } - // Clear + // Clear trackerToMarkedTasksMap.remove(taskTracker); } } @@ -937,6 +960,9 @@ // Mark the 'non-running' tasks for pruning markCompletedJob(job); + //persists the job info in DFS + completedJobStatusStore.store(job); + JobEndNotifier.registerNotification(job.getJobConf(), job.getStatus()); // Purge oldest jobs and keep at-most MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs of a given user @@ -1611,7 +1637,7 @@ if (job != null) { return job.getProfile(); } else { - return null; + return completedJobStatusStore.readJobProfile(jobid); } } public synchronized JobStatus getJobStatus(String jobid) { @@ -1619,7 +1645,7 @@ if (job != null) { return job.getStatus(); } else { - return null; + return completedJobStatusStore.readJobStatus(jobid); } } public synchronized Counters getJobCounters(String jobid) { @@ -1627,7 +1653,7 @@ if (job != null) { return job.getCounters(); } else { - return null; + return completedJobStatusStore.readCounters(jobid); } } public synchronized TaskReport[] getMapTaskReports(String jobid) { @@ -1679,10 +1705,13 @@ */ public synchronized TaskCompletionEvent[] getTaskCompletionEvents( String jobid, int fromEventId, int maxEvents) throws IOException{ - TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY; + TaskCompletionEvent[] events; JobInProgress job = this.jobs.get(jobid); if (null != job) { events = job.getTaskCompletionEvents(fromEventId, maxEvents); + } + else { + events = completedJobStatusStore.readJobTaskCompletionEvents(jobid, fromEventId, maxEvents); } return events; } Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java?rev=611527&view=auto ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java Sat Jan 12 18:49:01 2008 @@ -0,0 +1,178 @@ +package org.apache.hadoop.mapred; + +import junit.framework.TestCase; +import org.apache.hadoop.dfs.MiniDFSCluster; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.util.Map; +import java.util.Properties; + +/** + * Test case to run a MapReduce job. + * <p/> + * It runs a 2 node cluster Hadoop with a 2 node DFS. + * <p/> + * The JobConf to use must be obtained via the creatJobConf() method. + * <p/> + * It creates a temporary directory -accessible via getTestRootDir()- + * for both input and output. + * <p/> + * The input directory is accesible via getInputDir() and the output + * directory via getOutputDir() + * <p/> + * The DFS filesystem is formated before the testcase starts and after it ends. + */ +public abstract class ClusterMapReduceTestCase extends TestCase { + private MiniDFSCluster dfsCluster = null; + private MiniMRCluster mrCluster = null; + private FileSystem fileSystem = null; + + /** + * Creates Hadoop Cluster and DFS before a test case is run. + * + * @throws Exception + */ + protected void setUp() throws Exception { + super.setUp(); + + startCluster(true, null); + } + + /** + * Starts the cluster within a testcase. + * <p/> + * Note that the cluster is already started when the testcase method + * is invoked. This method is useful if as part of the testcase the + * cluster has to be shutdown and restarted again. + * <p/> + * If the cluster is already running this method does nothing. + * + * @param reformatDFS indicates if DFS has to be reformated + * @param props configuration properties to inject to the mini cluster + * @throws Exception if the cluster could not be started + */ + protected synchronized void startCluster(boolean reformatDFS, Properties props) + throws Exception { + if (dfsCluster == null) { + JobConf conf = new JobConf(); + if (props != null) { + for (Map.Entry entry : props.entrySet()) { + conf.set((String) entry.getKey(), (String) entry.getValue()); + } + } + dfsCluster = new MiniDFSCluster(conf, 2, reformatDFS, null); + fileSystem = dfsCluster.getFileSystem(); + + ConfigurableMiniMRCluster.setConfiguration(props); + //noinspection deprecation + mrCluster = new ConfigurableMiniMRCluster(2, fileSystem.getName(), 1); + } + } + + private static class ConfigurableMiniMRCluster extends MiniMRCluster { + private static Properties config; + + public static void setConfiguration(Properties props) { + config = props; + } + + public ConfigurableMiniMRCluster(int numTaskTrackers, String namenode, + int numDir) throws Exception { + super(numTaskTrackers, namenode, numDir); + } + + public JobConf createJobConf() { + JobConf conf = super.createJobConf(); + if (config != null) { + for (Map.Entry entry : config.entrySet()) { + conf.set((String) entry.getKey(), (String) entry.getValue()); + } + } + return conf; + } + } + + /** + * Stops the cluster within a testcase. + * <p/> + * Note that the cluster is already started when the testcase method + * is invoked. This method is useful if as part of the testcase the + * cluster has to be shutdown. + * <p/> + * If the cluster is already stopped this method does nothing. + * + * @throws Exception if the cluster could not be stopped + */ + protected void stopCluster() throws Exception { + if (mrCluster != null) { + mrCluster.shutdown(); + mrCluster = null; + } + if (dfsCluster != null) { + dfsCluster.shutdown(); + dfsCluster = null; + fileSystem = null; + } + } + + /** + * Destroys Hadoop Cluster and DFS after a test case is run. + * + * @throws Exception + */ + protected void tearDown() throws Exception { + stopCluster(); + super.tearDown(); + } + + /** + * Returns a preconfigured Filesystem instance for test cases to read and + * write files to it. + * <p/> + * TestCases should use this Filesystem instance. + * + * @return the filesystem used by Hadoop. + */ + protected FileSystem getFileSystem() { + return fileSystem; + } + + /** + * Returns the path to the root directory for the testcase. + * + * @return path to the root directory for the testcase. + */ + protected Path getTestRootDir() { + return new Path("x").getParent(); + } + + /** + * Returns a path to the input directory for the testcase. + * + * @return path to the input directory for the tescase. + */ + protected Path getInputDir() { + return new Path("input"); + } + + /** + * Returns a path to the output directory for the testcase. + * + * @return path to the output directory for the tescase. + */ + protected Path getOutputDir() { + return new Path("output"); + } + + /** + * Returns a job configuration preconfigured to run against the Hadoop + * managed by the testcase. + * + * @return configuration that works on the testcase Hadoop instance + */ + protected JobConf createJobConf() { + return mrCluster.createJobConf(); + } + +} Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java?rev=611527&view=auto ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java Sat Jan 12 18:49:01 2008 @@ -0,0 +1,139 @@ +package org.apache.hadoop.mapred; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import java.io.*; +import java.util.Iterator; +import java.util.Properties; + +public class TestClusterMapReduceTestCase extends ClusterMapReduceTestCase { + + public static class EchoMap implements Mapper { + + public void configure(JobConf conf) { + } + + public void close() { + } + + public void map(WritableComparable key, Writable value, + OutputCollector collector, Reporter reporter) throws IOException { + collector.collect(key, value); + } + } + + public static class EchoReduce implements Reducer { + + public void configure(JobConf conf) { + } + + public void close() { + } + + public void reduce(WritableComparable key, Iterator values, + OutputCollector collector, Reporter reporter) throws IOException { + while (values.hasNext()) { + Writable value = (Writable) values.next(); + collector.collect(key, value); + } + } + + } + + public void _testMapReduce(boolean restart) throws Exception { + OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt")); + Writer wr = new OutputStreamWriter(os); + wr.write("hello1\n"); + wr.write("hello2\n"); + wr.write("hello3\n"); + wr.write("hello4\n"); + wr.close(); + + if (restart) { + stopCluster(); + startCluster(false, null); + } + + JobConf conf = createJobConf(); + conf.setJobName("mr"); + + conf.setInputFormat(TextInputFormat.class); + + conf.setMapOutputKeyClass(LongWritable.class); + conf.setMapOutputValueClass(Text.class); + + conf.setOutputFormat(TextOutputFormat.class); + conf.setOutputKeyClass(LongWritable.class); + conf.setOutputValueClass(Text.class); + + conf.setMapperClass(TestClusterMapReduceTestCase.EchoMap.class); + conf.setReducerClass(TestClusterMapReduceTestCase.EchoReduce.class); + + conf.setInputPath(getInputDir()); + + conf.setOutputPath(getOutputDir()); + + + JobClient.runJob(conf); + + Path[] outputFiles = getFileSystem().listPaths(getOutputDir()); + + if (outputFiles.length > 0) { + InputStream is = getFileSystem().open(outputFiles[0]); + BufferedReader reader = new BufferedReader(new InputStreamReader(is)); + String line = reader.readLine(); + int counter = 0; + while (line != null) { + counter++; + assertTrue(line.contains("hello")); + line = reader.readLine(); + } + reader.close(); + assertEquals(4, counter); + } + + } + + public void testMapReduce() throws Exception { + _testMapReduce(false); + } + + public void testMapReduceRestarting() throws Exception { + _testMapReduce(true); + } + + public void testDFSRestart() throws Exception { + Path file = new Path(getInputDir(), "text.txt"); + OutputStream os = getFileSystem().create(file); + Writer wr = new OutputStreamWriter(os); + wr.close(); + + stopCluster(); + startCluster(false, null); + assertTrue(getFileSystem().exists(file)); + + stopCluster(); + startCluster(true, null); + assertFalse(getFileSystem().exists(file)); + + } + + public void testMRConfig() throws Exception { + JobConf conf = createJobConf(); + assertNull(conf.get("xyz")); + + Properties config = new Properties(); + config.setProperty("xyz", "XYZ"); + stopCluster(); + startCluster(false, config); + + conf = createJobConf(); + assertEquals("XYZ", conf.get("xyz")); + } + +} Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java?rev=611527&view=auto ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java Sat Jan 12 18:49:01 2008 @@ -0,0 +1,124 @@ +package org.apache.hadoop.mapred; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.fs.Path; + +import java.io.*; +import java.util.Iterator; +import java.util.Properties; + +public class TestJobStatusPersistency extends ClusterMapReduceTestCase { + + public static class EchoMap implements Mapper { + + public void configure(JobConf conf) { + } + + public void close() { + } + + public void map(WritableComparable key, Writable value, + OutputCollector collector, Reporter reporter) throws IOException { + collector.collect(key, value); + } + } + + public static class EchoReduce implements Reducer { + + public void configure(JobConf conf) { + } + + public void close() { + } + + public void reduce(WritableComparable key, Iterator values, + OutputCollector collector, Reporter reporter) throws IOException { + while (values.hasNext()) { + Writable value = (Writable) values.next(); + collector.collect(key, value); + } + } + + } + + private String runJob() throws Exception { + OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt")); + Writer wr = new OutputStreamWriter(os); + wr.write("hello1\n"); + wr.write("hello2\n"); + wr.write("hello3\n"); + wr.write("hello4\n"); + wr.close(); + + JobConf conf = createJobConf(); + conf.setJobName("mr"); + + conf.setInputFormat(TextInputFormat.class); + + conf.setMapOutputKeyClass(LongWritable.class); + conf.setMapOutputValueClass(Text.class); + + conf.setOutputFormat(TextOutputFormat.class); + conf.setOutputKeyClass(LongWritable.class); + conf.setOutputValueClass(Text.class); + + conf.setMapperClass(TestJobStatusPersistency.EchoMap.class); + conf.setReducerClass(TestJobStatusPersistency.EchoReduce.class); + + conf.setInputPath(getInputDir()); + + conf.setOutputPath(getOutputDir()); + + return JobClient.runJob(conf).getJobID(); + } + + public void testNonPersistency() throws Exception { + String jobId = runJob(); + JobClient jc = new JobClient(createJobConf()); + RunningJob rj = jc.getJob(jobId); + assertNotNull(rj); + stopCluster(); + startCluster(false, null); + jc = new JobClient(createJobConf()); + rj = jc.getJob(jobId); + assertNull(rj); + } + + public void testPersistency() throws Exception { + Properties config = new Properties(); + config.setProperty("mapred.job.tracker.persist.jobstatus.active", "true"); + config.setProperty("mapred.job.tracker.persist.jobstatus.hours", "1"); + stopCluster(); + startCluster(false, config); + String jobId = runJob(); + JobClient jc = new JobClient(createJobConf()); + RunningJob rj0 = jc.getJob(jobId); + assertNotNull(rj0); + boolean sucessfull0 = rj0.isSuccessful(); + String jobName0 = rj0.getJobName(); + Counters counters0 = rj0.getCounters(); + TaskCompletionEvent[] events0 = rj0.getTaskCompletionEvents(0); + + stopCluster(); + startCluster(false, config); + + jc = new JobClient(createJobConf()); + RunningJob rj1 = jc.getJob(jobId); + assertNotNull(rj1); + assertEquals(sucessfull0, rj1.isSuccessful()); + assertEquals(jobName0, rj0.getJobName()); + assertEquals(counters0.size(), rj1.getCounters().size()); + + TaskCompletionEvent[] events1 = rj1.getTaskCompletionEvents(0); + assertEquals(events0.length, events1.length); + for (int i = 0; i < events0.length; i++) { + assertEquals(events0[i].getTaskId(), events1[i].getTaskId()); + assertEquals(events0[i].getTaskStatus(), events1[i].getTaskStatus()); + } + } + +}