Author: ddas
Date: Fri May 22 15:23:40 2009
New Revision: 777570
URL: http://svn.apache.org/viewvc?rev=777570&view=rev
Log:
HADOOP-5850. Fixes a problem to do with not being able to jobs with 0
maps/reduces. Contributed by Vinod K V.
Added:
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestEmptyJob.java
Removed:
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java
hadoop/core/trunk/src/webapps/job/taskdetails.jsp
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=777570&r1=777569&r2=777570&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri May 22 15:23:40 2009
@@ -735,6 +735,9 @@
HADOOP-5210. Solves a problem in the progress report of the reduce task.
(Ravi Gummadi via ddas)
+ HADOOP-5850. Fixes a problem to do with not being able to jobs with
+ 0 maps/reduces. (Vinod K V via ddas)
+
Release 0.20.0 - 2009-04-15
INCOMPATIBLE CHANGES
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=777570&r1=777569&r2=777570&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Fri May 22 15:23:40 2009
@@ -385,7 +385,7 @@
jobInitKillStatus.initStarted = true;
}
- LOG.debug("initializing " + this.jobId);
+ LOG.info("Initializing " + jobId);
// log job info
JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile.toString(),
@@ -428,38 +428,15 @@
splits[i],
jobtracker, conf, this, i);
}
- LOG.info("Input size for job "+ jobId + " = " + inputLength);
+ LOG.info("Input size for job " + jobId + " = " + inputLength
+ + ". Number of splits = " + splits.length);
if (numMapTasks > 0) {
- LOG.info("Split info for job:" + jobId + " with " +
- splits.length + " splits:");
nonRunningMapCache = createCache(splits, maxLevel);
}
// set the launch time
this.launchTime = System.currentTimeMillis();
- // if no split is returned, job is considered completed and successful
- if (numMapTasks == 0) {
- // Finished time need to be setted here to prevent this job to be retired
- // from the job tracker jobs at the next retire iteration.
- this.finishTime = this.launchTime;
- status.setSetupProgress(1.0f);
- status.setMapProgress(1.0f);
- status.setReduceProgress(1.0f);
- status.setCleanupProgress(1.0f);
- status.setRunState(JobStatus.SUCCEEDED);
- tasksInited.set(true);
- JobHistory.JobInfo.logInited(profile.getJobID(),
- this.launchTime, 0, 0);
- JobHistory.JobInfo.logFinished(profile.getJobID(),
- this.finishTime, 0, 0, 0, 0,
- getCounters());
- // Special case because the Job is not queued
- JobEndNotifier.registerNotification(this.getJobConf(), this.getStatus());
-
- return;
- }
-
//
// Create reduce tasks
//
@@ -481,9 +458,11 @@
// create cleanup two cleanup tips, one map and one reduce.
cleanup = new TaskInProgress[2];
- // cleanup map tip. This map is doesn't use split.
- // Just assign splits[0]
- cleanup[0] = new TaskInProgress(jobId, jobFile, splits[0],
+
+ // cleanup map tip. This map doesn't use any splits. Just assign an empty
+ // split.
+ JobClient.RawSplit emptySplit = new JobClient.RawSplit();
+ cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
jobtracker, conf, this, numMapTasks);
cleanup[0].setJobCleanupTask();
@@ -494,9 +473,10 @@
// create two setup tips, one map and one reduce.
setup = new TaskInProgress[2];
- // setup map tip. This map is doesn't use split.
- // Just assign splits[0]
- setup[0] = new TaskInProgress(jobId, jobFile, splits[0],
+
+ // setup map tip. This map doesn't use any split. Just assign an empty
+ // split.
+ setup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
jobtracker, conf, this, numMapTasks + 1 );
setup[0].setJobSetupTask();
@@ -889,20 +869,11 @@
if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
double progressDelta = tip.getProgress() - oldProgress;
if (tip.isMapTask()) {
- if (maps.length == 0) {
- this.status.setMapProgress(1.0f);
- } else {
this.status.setMapProgress((float) (this.status.mapProgress() +
progressDelta / maps.length));
- }
} else {
- if (reduces.length == 0) {
- this.status.setReduceProgress(1.0f);
- } else {
- this.status.setReduceProgress
- ((float) (this.status.reduceProgress() +
- (progressDelta / reduces.length)));
- }
+ this.status.setReduceProgress((float) (this.status.reduceProgress() +
+ (progressDelta / reduces.length)));
}
}
}
@@ -1129,8 +1100,10 @@
status.getRunState() != JobStatus.PREP) {
return false;
}
- // check if cleanup task has been launched already.
- if (launchedCleanup) {
+ // check if cleanup task has been launched already or if setup isn't
+ // launched already. The later check is useful when number of maps is
+ // zero.
+ if (launchedCleanup || !isSetupFinished()) {
return false;
}
// check if job has failed or killed
@@ -1164,7 +1137,6 @@
if (!canLaunchSetupTask()) {
return null;
}
-
String taskTracker = tts.getTrackerName();
// Update the last-known clusterSize
this.clusterSize = clusterSize;
@@ -2111,6 +2083,12 @@
if (this.status.getRunState() == JobStatus.RUNNING ) {
this.status.setRunState(JobStatus.SUCCEEDED);
this.status.setCleanupProgress(1.0f);
+ if (maps.length == 0) {
+ this.status.setMapProgress(1.0f);
+ }
+ if (reduces.length == 0) {
+ this.status.setReduceProgress(1.0f);
+ }
this.finishTime = System.currentTimeMillis();
LOG.info("Job " + this.status.getJobID() +
" has completed successfully.");
@@ -2428,6 +2406,14 @@
}
}
+ boolean isSetupFinished() {
+ if (setup[0].isComplete() || setup[0].isFailed() || setup[1].isComplete()
+ || setup[1].isFailed()) {
+ return true;
+ }
+ return false;
+ }
+
/**
* Fail a task with a given reason, but without a status object.
*
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=777570&r1=777569&r2=777570&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
Fri May 22 15:23:40 2009
@@ -30,6 +30,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobClient.RawSplit;
import org.apache.hadoop.mapred.SortedRanges.Range;
import org.apache.hadoop.mapreduce.TaskType;
@@ -731,7 +732,10 @@
* Get the split locations
*/
public String[] getSplitLocations() {
- return rawSplit.getLocations();
+ if (isMapTask() && !jobSetup && !jobCleanup) {
+ return rawSplit.getLocations();
+ }
+ return new String[0];
}
/**
@@ -915,12 +919,18 @@
boolean taskCleanup) {
// create the task
Task t = null;
- if (isMapTask()) {
- LOG.debug("attempt "+ numTaskFailures +
- " sending skippedRecords "+failedRanges.getIndicesCount());
- t = new MapTask(jobFile, taskid, partition,
- rawSplit.getClassName(), rawSplit.getBytes());
- } else {
+ if (isMapTask() && !jobSetup && !jobCleanup) {
+ LOG.debug("attempt " + numTaskFailures + " sending skippedRecords "
+ + failedRanges.getIndicesCount());
+
+ t =
+ new MapTask(jobFile, taskid, partition, rawSplit.getClassName(),
+ rawSplit.getBytes());
+
+ } else if (jobSetup || jobCleanup) {
+ t = new MapTask(jobFile, taskid, partition, null, new BytesWritable());
+ }
+ else {
t = new ReduceTask(jobFile, taskid, partition, numMaps);
}
if (jobCleanup) {
@@ -1029,7 +1039,7 @@
* Gets the Node list of input split locations sorted in rack order.
*/
public String getSplitNodes() {
- if ( rawSplit == null) {
+ if (!isMapTask() || jobSetup || jobCleanup) {
return "";
}
String[] splits = rawSplit.getLocations();
Added:
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestEmptyJob.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestEmptyJob.java?rev=777570&view=auto
==============================================================================
---
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestEmptyJob.java
(added)
+++
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestEmptyJob.java
Fri May 22 15:23:40 2009
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+/**
+ * A JUnit test to test Map-Reduce empty jobs.
+ */
+public class TestEmptyJob extends TestCase {
+ private static final Log LOG =
+ LogFactory.getLog(TestEmptyJob.class.getName());
+
+ private static String TEST_ROOT_DIR =
+ new File(System.getProperty("test.build.data", "/tmp")).toURI()
+ .toString().replace(' ', '+');
+
+ MiniMRCluster mr = null;
+
+ /**
+ * Simple method running a MapReduce job with no input data. Used to test
that
+ * such a job is successful.
+ *
+ * @param fileSys
+ * @param numMaps
+ * @param numReduces
+ * @return true if the MR job is successful, otherwise false
+ * @throws IOException
+ */
+ private boolean launchEmptyJob(URI fileSys, int numMaps, int numReduces)
+ throws IOException {
+ // create an empty input dir
+ final Path inDir = new Path(TEST_ROOT_DIR, "testing/empty/input");
+ final Path outDir = new Path(TEST_ROOT_DIR, "testing/empty/output");
+ JobConf conf = mr.createJobConf();
+ FileSystem fs = FileSystem.get(fileSys, conf);
+ fs.delete(outDir, true);
+ if (!fs.mkdirs(inDir)) {
+ LOG.warn("Can't create " + inDir);
+ return false;
+ }
+
+ // use WordCount example
+ FileSystem.setDefaultUri(conf, fileSys);
+ conf.setJobName("empty");
+ // use an InputFormat which returns no split
+ conf.setInputFormat(EmptyInputFormat.class);
+ conf.setOutputKeyClass(Text.class);
+ conf.setOutputValueClass(IntWritable.class);
+ conf.setMapperClass(IdentityMapper.class);
+ conf.setReducerClass(IdentityReducer.class);
+ FileInputFormat.setInputPaths(conf, inDir);
+ FileOutputFormat.setOutputPath(conf, outDir);
+ conf.setNumMapTasks(numMaps);
+ conf.setNumReduceTasks(numReduces);
+
+ // run job and wait for completion
+ JobClient jc = new JobClient(conf);
+ RunningJob runningJob = jc.submitJob(conf);
+ while (true) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ }
+ if (runningJob.isComplete()) {
+ break;
+ }
+ }
+
+ assertTrue(runningJob.isComplete());
+ assertTrue(runningJob.isSuccessful());
+ JobID jobID = runningJob.getID();
+
+ TaskReport[] jobSetupTasks = jc.getSetupTaskReports(jobID);
+ assertTrue("Number of job-setup tips is not 2!", jobSetupTasks.length ==
2);
+ assertTrue("Setup progress is " + runningJob.setupProgress()
+ + " and not 1.0", runningJob.setupProgress() == 1.0);
+ assertTrue("Setup task is not finished!", mr.getJobTrackerRunner()
+ .getJobTracker().getJob(jobID).isSetupFinished());
+
+ assertTrue("Number of maps is not zero!", jc.getMapTaskReports(runningJob
+ .getID()).length == 0);
+ assertTrue(
+ "Map progress is " + runningJob.mapProgress() + " and not 1.0!",
+ runningJob.mapProgress() == 1.0);
+
+ assertTrue("Reduce progress is " + runningJob.reduceProgress()
+ + " and not 1.0!", runningJob.reduceProgress() == 1.0);
+ assertTrue("Number of reduces is not " + numReduces, jc
+ .getReduceTaskReports(runningJob.getID()).length == numReduces);
+
+ TaskReport[] jobCleanupTasks = jc.getCleanupTaskReports(jobID);
+ assertTrue("Number of job-cleanup tips is not 2!",
+ jobCleanupTasks.length == 2);
+ assertTrue("Cleanup progress is " + runningJob.cleanupProgress()
+ + " and not 1.0", runningJob.cleanupProgress() == 1.0);
+
+ assertTrue("Job output directory doesn't exit!", fs.exists(outDir));
+ FileStatus[] list = fs.listStatus(outDir, new OutputLogFilter());
+ assertTrue("Number of part-files is " + list.length + " and not "
+ + numReduces, list.length == numReduces);
+
+ // cleanup
+ fs.delete(outDir, true);
+
+ // return job result
+ LOG.info("job is complete: " + runningJob.isSuccessful());
+ return (runningJob.isSuccessful());
+ }
+
+ /**
+ * Test that a job with no input data (and thus with no input split and no
map
+ * task to execute) is successful.
+ *
+ * @throws IOException
+ */
+ public void testEmptyJob()
+ throws IOException {
+ FileSystem fileSys = null;
+ try {
+ final int taskTrackers = 1;
+ JobConf conf = new JobConf();
+ fileSys = FileSystem.get(conf);
+
+ conf.set("mapred.job.tracker.handler.count", "1");
+ conf.set("mapred.job.tracker", "127.0.0.1:0");
+ conf.set("mapred.job.tracker.http.address", "127.0.0.1:0");
+ conf.set("mapred.task.tracker.http.address", "127.0.0.1:0");
+
+ mr =
+ new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1,
+ null, null, conf);
+
+ assertTrue(launchEmptyJob(fileSys.getUri(), 3, 1));
+ assertTrue(launchEmptyJob(fileSys.getUri(), 0, 0));
+ } finally {
+ if (fileSys != null) {
+ fileSys.close();
+ }
+ if (mr != null) {
+ mr.shutdown();
+ }
+ }
+ }
+}
Modified:
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java?rev=777570&r1=777569&r2=777570&view=diff
==============================================================================
---
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java
(original)
+++
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java
Fri May 22 15:23:40 2009
@@ -37,7 +37,7 @@
//end of the job (indirectly testing whether all tasktrackers
//got a KillJobAction).
private static final Log LOG =
- LogFactory.getLog(TestEmptyJobWithDFS.class.getName());
+ LogFactory.getLog(TestEmptyJob.class.getName());
private void runSleepJob(JobConf conf) throws Exception {
String[] args = { "-m", "1", "-r", "10", "-mt", "1000", "-rt", "10000" };
ToolRunner.run(conf, new SleepJob(), args);
Modified: hadoop/core/trunk/src/webapps/job/taskdetails.jsp
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/job/taskdetails.jsp?rev=777570&r1=777569&r2=777570&view=diff
==============================================================================
--- hadoop/core/trunk/src/webapps/job/taskdetails.jsp (original)
+++ hadoop/core/trunk/src/webapps/job/taskdetails.jsp Fri May 22 15:23:40 2009
@@ -278,7 +278,7 @@
</center>
<%
- if (ts[0].getIsMap()) {
+ if (ts[0].getIsMap() && !isCleanupOrSetup) {
%>
<h3>Input Split Locations</h3>
<table border=2 cellpadding="5" cellspacing="2">