Author: cutting Date: Wed Jul 11 13:10:44 2007 New Revision: 555383 URL: http://svn.apache.org/viewvc?view=rev&rev=555383 Log: HADOOP-1473. Make job ids unique across jobtracker restarts. Contributed by Owen.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=555383&r1=555382&r2=555383 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Wed Jul 11 13:10:44 2007 @@ -306,6 +306,9 @@ 94. HADOOP-1584. Fix a bug in GenericWritable which limited it to 128 types instead of 256. (Espen Amble Kolstad via cutting) + 95. HADOOP-1473. Make job ids unique across jobtracker restarts. + (omalley via cutting) + Release 0.13.0 - 2007-06-08 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=555383&r1=555382&r2=555383 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Wed Jul 11 13:10:44 2007 @@ -96,7 +96,7 @@ boolean tasksInited = false; private LocalFileSystem localFs; - private String uniqueString; + private String jobId; // Per-job counters public static enum Counter { @@ -116,23 +116,24 @@ */ public JobInProgress(String jobFile, JobTracker jobtracker, Configuration default_conf) throws IOException { - uniqueString = jobtracker.createUniqueId(); - String jobid = "job_" + uniqueString; - String url = "http://" + jobtracker.getJobTrackerMachine() + ":" + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobid; + jobId = jobtracker.getTrackerIdentifier() + "_" +jobtracker.createJobId(); + String fullJobId = "job_" + jobId; + String url = "http://" + jobtracker.getJobTrackerMachine() + ":" + + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + fullJobId; this.jobtracker = jobtracker; - this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP); + this.status = new JobStatus(fullJobId, 0.0f, 0.0f, JobStatus.PREP); this.startTime = System.currentTimeMillis(); this.localFs = (LocalFileSystem)FileSystem.getLocal(default_conf); JobConf default_job_conf = new JobConf(default_conf); this.localJobFile = default_job_conf.getLocalPath(JobTracker.SUBDIR - +"/"+jobid + ".xml"); + +"/"+fullJobId + ".xml"); this.localJarFile = default_job_conf.getLocalPath(JobTracker.SUBDIR - +"/"+ jobid + ".jar"); + +"/"+ fullJobId + ".jar"); FileSystem fs = FileSystem.get(default_conf); fs.copyToLocalFile(new Path(jobFile), localJobFile); conf = new JobConf(localJobFile); - this.profile = new JobProfile(conf.getUser(), jobid, jobFile, url, + this.profile = new JobProfile(conf.getUser(), fullJobId, jobFile, url, conf.getJobName()); String jarFile = conf.getJar(); if (jarFile != null) { @@ -142,13 +143,13 @@ this.numMapTasks = conf.getNumMapTasks(); this.numReduceTasks = conf.getNumReduceTasks(); - this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>( - numMapTasks + numReduceTasks + 10); + this.taskCompletionEvents = new ArrayList<TaskCompletionEvent> + (numMapTasks + numReduceTasks + 10); this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent(); this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent(); - JobHistory.JobInfo.logSubmitted(jobid, conf.getJobName(), conf.getUser(), + JobHistory.JobInfo.logSubmitted(fullJobId, conf.getJobName(), conf.getUser(), System.currentTimeMillis(), jobFile); MetricsContext metricsContext = MetricsUtil.getContext("mapred"); @@ -156,7 +157,7 @@ this.jobMetrics.setTag("user", conf.getUser()); this.jobMetrics.setTag("sessionId", conf.getSessionId()); this.jobMetrics.setTag("jobName", conf.getJobName()); - this.jobMetrics.setTag("jobId", jobid); + this.jobMetrics.setTag("jobId", fullJobId); } /** @@ -219,7 +220,7 @@ numMapTasks = splits.length; maps = new TaskInProgress[numMapTasks]; for(int i=0; i < numMapTasks; ++i) { - maps[i] = new TaskInProgress(uniqueString, jobFile, + maps[i] = new TaskInProgress(jobId, jobFile, splits[i].getClassName(), splits[i].getBytes(), jobtracker, conf, this, i); @@ -254,7 +255,7 @@ // this.reduces = new TaskInProgress[numReduceTasks]; for (int i = 0; i < numReduceTasks; i++) { - reduces[i] = new TaskInProgress(uniqueString, jobFile, + reduces[i] = new TaskInProgress(jobId, jobFile, numMapTasks, i, jobtracker, conf, this); } @@ -1071,7 +1072,7 @@ // Delete temp dfs dirs created if any, like in case of // speculative exn of reduces. - String tempDir = conf.get("mapred.system.dir") + "/job_" + uniqueString; + String tempDir = conf.get("mapred.system.dir") + "/job_" + jobId; fs.delete(new Path(tempDir)); } catch (IOException e) { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=555383&r1=555382&r2=555383 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Wed Jul 11 13:10:44 2007 @@ -21,16 +21,17 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.text.NumberFormat; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; +import java.util.Date; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Random; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; @@ -492,6 +493,7 @@ //////////////////////////////////////////////////////////////// int port; String localMachine; + private String trackerIdentifier; long startTime; int totalSubmissions = 0; @@ -651,6 +653,8 @@ this.infoServer.start(); this.startTime = System.currentTimeMillis(); + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmm"); + trackerIdentifier = dateFormat.format(new Date()); myMetrics = new JobTrackerMetrics(jobConf); this.expireTrackersThread = new Thread(this.expireTrackers, @@ -973,6 +977,15 @@ public String getJobTrackerMachine() { return localMachine; } + + /** + * Get the unique identifier (ie. timestamp) of this job tracker start. + * @return a string with a unique identifier + */ + public String getTrackerIdentifier() { + return trackerIdentifier; + } + public int getTrackerPort() { return port; } @@ -1633,7 +1646,7 @@ /** * Grab random num for job id */ - String createUniqueId() { + String createJobId() { return idFormat.format(nextJobId++); } Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java?view=diff&rev=555383&r1=555382&r2=555383 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java Wed Jul 11 13:10:44 2007 @@ -124,8 +124,18 @@ } } - public static boolean launchMRCache(String indir, - String outdir, String cacheDir, JobConf conf, String input) + public static class TestResult { + public RunningJob job; + public boolean isOutputOk; + TestResult(RunningJob job, boolean isOutputOk) { + this.job = job; + this.isOutputOk = isOutputOk; + } + } + + public static TestResult launchMRCache(String indir, + String outdir, String cacheDir, + JobConf conf, String input) throws IOException { String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data","/tmp")) .toString().replace(' ', '+'); @@ -197,7 +207,7 @@ DistributedCache.addCacheArchive(uri1, conf); DistributedCache.addCacheArchive(uri2, conf); DistributedCache.addCacheFile(uri3, conf); - JobClient.runJob(conf); + RunningJob job = JobClient.runJob(conf); int count = 0; // after the job ran check to see if the the input from the localized cache // match the real string. check if there are 3 instances or not. @@ -208,7 +218,7 @@ String line = file.readLine(); while (line != null) { if (!testStr.equals(line)) - return false; + return new TestResult(job, false); count++; line = file.readLine(); @@ -216,9 +226,9 @@ file.close(); } if (count != 3) - return false; + return new TestResult(job, false); - return true; + return new TestResult(job, true); } } Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java?view=diff&rev=555383&r1=555382&r2=555383 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java Wed Jul 11 13:10:44 2007 @@ -22,6 +22,7 @@ import junit.framework.TestCase; import org.apache.hadoop.dfs.MiniDFSCluster; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.mapred.MRCaching.TestResult; /** * A JUnit test to test caching with DFS @@ -39,13 +40,13 @@ fileSys = dfs.getFileSystem(); mr = new MiniMRCluster(2, fileSys.getName(), 4); // run the wordcount example with caching - boolean ret = MRCaching.launchMRCache("/testing/wc/input", + TestResult ret = MRCaching.launchMRCache("/testing/wc/input", "/testing/wc/output", "/cachedir", mr.createJobConf(), "The quick brown fox\nhas many silly\n" + "red fox sox\n"); - assertTrue("Archives not matching", ret); + assertTrue("Archives not matching", ret.isOutputOk); } finally { if (fileSys != null) { fileSys.close(); Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?view=diff&rev=555383&r1=555382&r2=555383 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Wed Jul 11 13:10:44 2007 @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.MRCaching.TestResult; import org.apache.hadoop.util.Progressable; import java.io.DataInput; @@ -52,7 +53,7 @@ assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01)); // run the wordcount example with caching JobConf job = mr.createJobConf(); - boolean ret = MRCaching.launchMRCache(TEST_ROOT_DIR + "/wc/input", + TestResult ret = MRCaching.launchMRCache(TEST_ROOT_DIR + "/wc/input", TEST_ROOT_DIR + "/wc/output", TEST_ROOT_DIR + "/cachedir", job, @@ -60,12 +61,13 @@ + "has many silly\n" + "red fox sox\n"); // assert the number of lines read during caching - assertTrue("Failed test archives not matching", ret); + assertTrue("Failed test archives not matching", ret.isOutputOk); // test the task report fetchers JobClient client = new JobClient(job); - TaskReport[] reports = client.getMapTaskReports("job_0001"); - assertEquals("number of maps", 10, reports.length); - reports = client.getReduceTaskReports("job_0001"); + String jobid = ret.job.getJobID(); + TaskReport[] reports = client.getMapTaskReports(jobid); + assertEquals("number of maps", 1, reports.length); + reports = client.getReduceTaskReports(jobid); assertEquals("number of reduces", 1, reports.length); runCustomFormats(mr); } finally { Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?view=diff&rev=555383&r1=555382&r2=555383 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Wed Jul 11 13:10:44 2007 @@ -42,10 +42,18 @@ static final int NUM_MAPS = 10; static final int NUM_SAMPLES = 100000; - public static String launchWordCount(JobConf conf, - String input, - int numMaps, - int numReduces) throws IOException { + public static class TestResult { + public String output; + public RunningJob job; + TestResult(RunningJob job, String output) { + this.job = job; + this.output = output; + } + } + public static TestResult launchWordCount(JobConf conf, + String input, + int numMaps, + int numReduces) throws IOException { final Path inDir = new Path("/testing/wc/input"); final Path outDir = new Path("/testing/wc/output"); FileSystem fs = FileSystem.get(conf); @@ -73,8 +81,8 @@ conf.setOutputPath(outDir); conf.setNumMapTasks(numMaps); conf.setNumReduceTasks(numReduces); - JobClient.runJob(conf); - return readOutput(outDir, conf); + RunningJob job = JobClient.runJob(conf); + return new TestResult(job, readOutput(outDir, conf)); } public static String readOutput(Path outDir, @@ -167,19 +175,21 @@ // Run a word count example JobConf jobConf = mr.createJobConf(); // Keeping tasks that match this pattern - jobConf.setKeepTaskFilesPattern("task_[0-9]*_m_000001_.*"); - String result; + jobConf.setKeepTaskFilesPattern("task_[^_]*_[0-9]*_m_000001_.*"); + TestResult result; result = launchWordCount(jobConf, "The quick brown fox\nhas many silly\n" + "red fox sox\n", 3, 1); assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" + - "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result); - checkTaskDirectories(mr, new String[]{"job_0002"}, new String[]{"task_0002_m_000001_0"}); + "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result.output); + String jobid = result.job.getJobID(); + String taskid = "task_" + jobid.substring(4) + "_m_000001_0"; + checkTaskDirectories(mr, new String[]{jobid}, new String[]{taskid}); // test with maps=0 jobConf = mr.createJobConf(); result = launchWordCount(jobConf, "owen is oom", 0, 1); - assertEquals("is\t1\noom\t1\nowen\t1\n", result); + assertEquals("is\t1\noom\t1\nowen\t1\n", result.output); } finally { if (fileSys != null) { fileSys.close(); } if (dfs != null) { dfs.shutdown(); }