Author: tomwhite
Date: Wed Jan 9 15:01:10 2013
New Revision: 1430876
URL: http://svn.apache.org/viewvc?rev=1430876&view=rev
Log:
MAPREDUCE-4850. Job recovery may fail if staging directory has been deleted.
Modified:
hadoop/common/branches/branch-1/CHANGES.txt
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/Task.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1430876&r1=1430875&r2=1430876&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Wed Jan 9 15:01:10 2013
@@ -413,6 +413,9 @@ Release 1.2.0 - unreleased
HADOOP-9191. TestAccessControlList and TestJobHistoryConfig fail with
JDK7. (Arpit Agarwal via suresh)
+ MAPREDUCE-4850. Job recovery may fail if staging directory has been
+ deleted. (tomwhite)
+
Release 1.1.2 - Unreleased
INCOMPATIBLE CHANGES
Modified:
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java?rev=1430876&r1=1430875&r2=1430876&view=diff
==============================================================================
---
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
(original)
+++
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
Wed Jan 9 15:01:10 2013
@@ -56,10 +56,17 @@ public class CleanupQueue {
static class PathDeletionContext {
final Path fullPath;// full path of file or dir
final Configuration conf;
+ final UserGroupInformation ugi;
public PathDeletionContext(Path fullPath, Configuration conf) {
+ this(fullPath, conf, null);
+ }
+
+ public PathDeletionContext(Path fullPath, Configuration conf,
+ UserGroupInformation ugi) {
this.fullPath = fullPath;
this.conf = conf;
+ this.ugi = ugi;
}
protected Path getPathForCleanup() {
@@ -72,7 +79,7 @@ public class CleanupQueue {
*/
protected void deletePath() throws IOException, InterruptedException {
final Path p = getPathForCleanup();
- UserGroupInformation.getLoginUser().doAs(
+ (ugi == null ? UserGroupInformation.getLoginUser() : ugi).doAs(
new PrivilegedExceptionAction<Object>() {
public Object run() throws IOException {
p.getFileSystem(conf).delete(p, true);
Modified:
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1430876&r1=1430875&r2=1430876&view=diff
==============================================================================
---
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Wed Jan 9 15:01:10 2013
@@ -3250,7 +3250,17 @@ public class JobInProgress {
Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
CleanupQueue.getInstance().addToQueue(
- new PathDeletionContext(tempDir, conf));
+ new PathDeletionContext(tempDir, conf));
+
+ // delete the staging area for the job
+ String jobTempDir = conf.get("mapreduce.job.dir");
+ if (jobTempDir != null && conf.getKeepTaskFilesPattern() == null &&
+ !conf.getKeepFailedTaskFiles()) {
+ Path jobTempDirPath = new Path(jobTempDir);
+ CleanupQueue.getInstance().addToQueue(
+ new PathDeletionContext(jobTempDirPath, conf, userUGI));
+ }
+
} catch (IOException e) {
LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
}
Modified:
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/Task.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1430876&r1=1430875&r2=1430876&view=diff
==============================================================================
---
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/Task.java
(original)
+++
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/Task.java
Wed Jan 9 15:01:10 2013
@@ -1071,14 +1071,6 @@ abstract public class Task implements Wr
+ JobStatus.State.FAILED + " or "
+ JobStatus.State.KILLED);
}
- // delete the staging area for the job
- JobConf conf = new JobConf(jobContext.getConfiguration());
- if (!supportIsolationRunner(conf)) {
- String jobTempDir = conf.get("mapreduce.job.dir");
- Path jobTempDirPath = new Path(jobTempDir);
- FileSystem fs = jobTempDirPath.getFileSystem(conf);
- fs.delete(jobTempDirPath, true);
- }
done(umbilical, reporter);
}
Modified:
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=1430876&r1=1430875&r2=1430876&view=diff
==============================================================================
---
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
(original)
+++
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
Wed Jan 9 15:01:10 2013
@@ -21,11 +21,13 @@ package org.apache.hadoop.mapred;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.CountDownLatch;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.SleepJob;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -53,25 +55,27 @@ public class TestRecoveryManager {
private MiniMRCluster mr;
@Before
- public void setUp() {
- JobConf conf = new JobConf();
- try {
- fs = FileSystem.get(new Configuration());
- fs.delete(TEST_DIR, true);
- conf.set("mapred.jobtracker.job.history.block.size", "1024");
- conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
- mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
- } catch (IOException e) {
- e.printStackTrace();
- }
+ public void setUp() throws IOException {
+ fs = FileSystem.get(new Configuration());
+ fs.delete(TEST_DIR, true);
+ }
+
+ private void startCluster() throws IOException {
+ startCluster(new JobConf());
+ }
+
+ private void startCluster(JobConf conf) throws IOException {
+ mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
}
@After
public void tearDown() {
- ClusterStatus status = mr.getJobTrackerRunner().getJobTracker()
- .getClusterStatus(false);
- if (status.getJobTrackerState() == JobTracker.State.RUNNING) {
- mr.shutdown();
+ if (mr != null) {
+ ClusterStatus status = mr.getJobTrackerRunner().getJobTracker()
+ .getClusterStatus(false);
+ if (status.getJobTrackerState() == JobTracker.State.RUNNING) {
+ mr.shutdown();
+ }
}
}
@@ -87,6 +91,7 @@ public class TestRecoveryManager {
@Test(timeout=120000)
public void testJobTrackerRestartsWithMissingJobFile() throws Exception {
LOG.info("Testing jobtracker restart with faulty job");
+ startCluster();
String signalFile = new Path(TEST_DIR, "signal").toString();
JobConf job1 = mr.createJobConf();
@@ -168,6 +173,7 @@ public class TestRecoveryManager {
@Test(timeout=120000)
public void testJobResubmission() throws Exception {
LOG.info("Testing Job Resubmission");
+ startCluster();
String signalFile = new Path(TEST_DIR, "signal").toString();
// make sure that the jobtracker is in recovery mode
@@ -216,6 +222,73 @@ public class TestRecoveryManager {
Assert.assertTrue("Task should be successful", rJob1.isSuccessful());
}
+ public static class TestJobTrackerInstrumentation extends
JobTrackerInstrumentation {
+ static CountDownLatch finalizeCall = new CountDownLatch(1);
+
+ public TestJobTrackerInstrumentation(JobTracker jt, JobConf conf) {
+ super(jt, conf);
+ }
+
+ public void finalizeJob(JobConf conf, JobID id) {
+ if (finalizeCall.getCount() == 0) {
+ return;
+ }
+ finalizeCall.countDown();
+ throw new IllegalStateException("Controlled error finalizing job");
+ }
+ }
+
+ @Test
+ public void testJobTrackerRestartBeforeJobFinalization() throws Exception {
+ LOG.info("Testing Job Resubmission");
+
+ JobConf conf = new JobConf();
+ // make sure that the jobtracker is in recovery mode
+ conf.setBoolean("mapred.jobtracker.restart.recover", true);
+
+ // use a test JobTrackerInstrumentation implementation to shut down
+ // the jobtracker after the tasks have all completed, but
+ // before the job is finalized and check that it can be recovered correctly
+ conf.setClass("mapred.jobtracker.instrumentation",
TestJobTrackerInstrumentation.class,
+ JobTrackerInstrumentation.class);
+
+ startCluster(conf);
+
+ JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
+
+ SleepJob job = new SleepJob();
+ job.setConf(mr.createJobConf());
+ JobConf job1 = job.setupJobConf(1, 0, 1, 1, 1, 1);
+ JobClient jc = new JobClient(job1);
+ RunningJob rJob1 = jc.submitJob(job1);
+ LOG.info("Submitted first job " + rJob1.getID());
+
+ TestJobTrackerInstrumentation.finalizeCall.await();
+
+ // kill the jobtracker
+ LOG.info("Stopping jobtracker");
+ mr.stopJobTracker();
+
+ // start the jobtracker
+ LOG.info("Starting jobtracker");
+ mr.startJobTracker();
+ UtilsForTests.waitForJobTracker(jc);
+
+ jobtracker = mr.getJobTrackerRunner().getJobTracker();
+
+ // assert that job is recovered by the jobtracker
+ Assert.assertEquals("Resubmission failed ", 1,
+ jobtracker.getAllJobs().length);
+
+ // wait for job 1 to complete
+ JobInProgress jip = jobtracker.getJob(rJob1.getID());
+ while (!jip.isComplete()) {
+ LOG.info("Waiting for job " + rJob1.getID() + " to be successful");
+ UtilsForTests.waitFor(100);
+ }
+ Assert.assertTrue("Task should be successful", rJob1.isSuccessful());
+ }
+
/**
* Tests the {@link JobTracker.RecoveryManager} against the exceptions
thrown
* during recovery. It does the following :
@@ -231,6 +304,7 @@ public class TestRecoveryManager {
@Test(timeout=120000)
public void testJobTrackerRestartWithBadJobs() throws Exception {
LOG.info("Testing recovery-manager");
+ startCluster();
String signalFile = new Path(TEST_DIR, "signal").toString();
// make sure that the jobtracker is in recovery mode
mr.getJobTrackerConf()
@@ -362,6 +436,7 @@ public class TestRecoveryManager {
@Test(timeout=120000)
public void testRestartCount() throws Exception {
LOG.info("Testing Job Restart Count");
+ startCluster();
String signalFile = new Path(TEST_DIR, "signal").toString();
// make sure that the jobtracker is in recovery mode
mr.getJobTrackerConf()