Author: ddas
Date: Wed Jun 3 03:31:58 2009
New Revision: 781248
URL: http://svn.apache.org/viewvc?rev=781248&view=rev
Log:
HADOOP-5908. Fixes a problem to do with ArithmeticException in the JobTracker
when there are jobs with 0 maps. Contributed by Amar Kamat.
Modified:
hadoop/core/branches/branch-0.20/CHANGES.txt
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestEmptyJob.java
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/UtilsForTests.java
Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=781248&r1=781247&r2=781248&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Wed Jun 3 03:31:58 2009
@@ -107,6 +107,9 @@
HADOOP-5648. Fixes a build issue in not being able to generate gridmix.jar
in hadoop binary tarball. (Giridharan Kesavan via gkesavan)
+ HADOOP-5908. Fixes a problem to do with ArithmeticException in the
+ JobTracker when there are jobs with 0 maps. (Amar Kamat via ddas)
+
Release 0.20.0 - 2009-04-15
INCOMPATIBLE CHANGES
Modified:
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=781248&r1=781247&r2=781248&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Wed Jun 3 03:31:58 2009
@@ -583,6 +583,14 @@
return inputLength;
}
+ boolean isCleanupLaunched() {
+ return launchedCleanup;
+ }
+
+ boolean isSetupLaunched() {
+ return launchedSetup;
+ }
+
/**
* Get the list of map tasks
* @return the raw array of maps for this job
@@ -1666,6 +1674,11 @@
final int numUniqueHosts,
final int maxCacheLevel,
final double avgProgress) {
+ if (numMapTasks == 0) {
+ LOG.info("No maps to schedule for " + profile.getJobID());
+ return -1;
+ }
+
String taskTracker = tts.getTrackerName();
TaskInProgress tip = null;
@@ -1871,6 +1884,11 @@
int clusterSize,
int numUniqueHosts,
double avgProgress) {
+ if (numReduceTasks == 0) {
+ LOG.info("No reduces to schedule for " + profile.getJobID());
+ return -1;
+ }
+
String taskTracker = tts.getTrackerName();
TaskInProgress tip = null;
Modified:
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java?rev=781248&r1=781247&r2=781248&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java
(original)
+++
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java
Wed Jun 3 03:31:58 2009
@@ -82,7 +82,10 @@
* @return estimated length of this job's average map output
*/
long getEstimatedMapOutputSize() {
- long estimate = getEstimatedTotalMapOutputSize() / job.desiredMaps();
+ long estimate = 0L;
+ if (job.desiredMaps() > 0) {
+ estimate = getEstimatedTotalMapOutputSize() / job.desiredMaps();
+ }
return estimate;
}
Modified:
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestEmptyJob.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestEmptyJob.java?rev=781248&r1=781247&r2=781248&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestEmptyJob.java
(original)
+++
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestEmptyJob.java
Wed Jun 3 03:31:58 2009
@@ -26,6 +26,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -47,6 +48,26 @@
MiniMRCluster mr = null;
+ /** Committer with cleanup waiting on a signal
+ */
+ static class CommitterWithDelayCleanup extends FileOutputCommitter {
+ @Override
+ public void cleanupJob(JobContext context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ Path share = new Path(conf.get("share"));
+ FileSystem fs = FileSystem.get(conf);
+
+
+ while (true) {
+ if (fs.exists(share)) {
+ break;
+ }
+ UtilsForTests.waitFor(100);
+ }
+ super.cleanupJob(context);
+ }
+ }
+
/**
* Simple method running a MapReduce job with no input data. Used to test
that
* such a job is successful.
@@ -62,8 +83,13 @@
// create an empty input dir
final Path inDir = new Path(TEST_ROOT_DIR, "testing/empty/input");
final Path outDir = new Path(TEST_ROOT_DIR, "testing/empty/output");
+ final Path inDir2 = new Path(TEST_ROOT_DIR, "testing/dummy/input");
+ final Path outDir2 = new Path(TEST_ROOT_DIR, "testing/dummy/output");
+ final Path share = new Path(TEST_ROOT_DIR, "share");
+
JobConf conf = mr.createJobConf();
FileSystem fs = FileSystem.get(fileSys, conf);
+ fs.delete(new Path(TEST_ROOT_DIR), true);
fs.delete(outDir, true);
if (!fs.mkdirs(inDir)) {
LOG.warn("Can't create " + inDir);
@@ -75,6 +101,7 @@
conf.setJobName("empty");
// use an InputFormat which returns no split
conf.setInputFormat(EmptyInputFormat.class);
+ conf.setOutputCommitter(CommitterWithDelayCleanup.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(IdentityMapper.class);
@@ -83,11 +110,53 @@
FileOutputFormat.setOutputPath(conf, outDir);
conf.setNumMapTasks(numMaps);
conf.setNumReduceTasks(numReduces);
+ conf.set("share", share.toString());
// run job and wait for completion
JobClient jc = new JobClient(conf);
RunningJob runningJob = jc.submitJob(conf);
+ JobInProgress job =
mr.getJobTrackerRunner().getJobTracker().getJob(runningJob.getID());
+
+ while (true) {
+ if (job.isCleanupLaunched()) {
+ LOG.info("Waiting for cleanup to be launched for job "
+ + runningJob.getID());
+ break;
+ }
+ UtilsForTests.waitFor(100);
+ }
+
+ // submit another job so that the map load increases and scheduling happens
+ LOG.info("Launching dummy job ");
+ RunningJob dJob = null;
+ try {
+ JobConf dConf = new JobConf(conf);
+ dConf.setOutputCommitter(FileOutputCommitter.class);
+ dJob = UtilsForTests.runJob(dConf, inDir2, outDir2, 2, 0);
+ } catch (Exception e) {
+ LOG.info("Exception ", e);
+ throw new IOException(e);
+ }
+
+ while (true) {
+ LOG.info("Waiting for job " + dJob.getID() + " to complete");
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ }
+ if (dJob.isComplete()) {
+ break;
+ }
+ }
+
+ // check if the second job is successful
+ assertTrue(dJob.isSuccessful());
+
+ // signal the cleanup
+ fs.create(share).close();
+
while (true) {
+ LOG.info("Waiting for job " + runningJob.getID() + " to complete");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
@@ -148,7 +217,7 @@
throws IOException {
FileSystem fileSys = null;
try {
- final int taskTrackers = 1;
+ final int taskTrackers = 2;
JobConf conf = new JobConf();
fileSys = FileSystem.get(conf);
Modified:
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/UtilsForTests.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/UtilsForTests.java?rev=781248&r1=781247&r2=781248&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/UtilsForTests.java
(original)
+++
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/UtilsForTests.java
Wed Jun 3 03:31:58 2009
@@ -535,6 +535,12 @@
// Start a job and return its RunningJob object
static RunningJob runJob(JobConf conf, Path inDir, Path outDir)
throws IOException {
+ return runJob(conf, inDir, outDir, conf.getNumMapTasks(),
conf.getNumReduceTasks());
+ }
+
+ // Start a job and return its RunningJob object
+ static RunningJob runJob(JobConf conf, Path inDir, Path outDir, int numMaps,
+ int numReds) throws IOException {
FileSystem fs = FileSystem.get(conf);
fs.delete(outDir, true);
@@ -543,9 +549,11 @@
}
String input = "The quick brown fox\n" + "has many silly\n"
+ "red fox sox\n";
- DataOutputStream file = fs.create(new Path(inDir, "part-0"));
- file.writeBytes(input);
- file.close();
+ for (int i = 0; i < numMaps; ++i) {
+ DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
+ file.writeBytes(input);
+ file.close();
+ }
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
@@ -553,8 +561,8 @@
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
- conf.setNumMapTasks(1);
- conf.setNumReduceTasks(1);
+ conf.setNumMapTasks(numMaps);
+ conf.setNumReduceTasks(numReds);
JobClient jobClient = new JobClient(conf);
RunningJob job = jobClient.submitJob(conf);