Author: ddas
Date: Mon Jun 15 06:05:01 2009
New Revision: 784661
URL: http://svn.apache.org/viewvc?rev=784661&view=rev
Log:
HADOOP-5921. Fixes a problem in the JobTracker where it sometimes never used to
come up due to a system file creation on JobTracker's system-dir failing.This
problem would sometimes show up only when the FS for the system-dir (usually
HDFS) is started at nearly the same time as the JobTracker. Contributed by Amar
Kamat.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=784661&r1=784660&r2=784661&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Jun 15 06:05:01 2009
@@ -939,6 +939,12 @@
causing TestQueueCapacities to fail.
(Sreekanth Ramakrishnan via yhemanth)
+ HADOOP-5921. Fixes a problem in the JobTracker where it sometimes never
used
+ to come up due to a system file creation on JobTracker's system-dir
failing.
+ This problem would sometimes show up only when the FS for the system-dir
+ (usually HDFS) is started at nearly the same time as the JobTracker.
+ (Amar Kamat via ddas)
+
Release 0.20.0 - 2009-04-15
INCOMPATIBLE CHANGES
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=784661&r1=784660&r2=784661&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Mon
Jun 15 06:05:01 2009
@@ -121,7 +121,7 @@
private int NUM_HEARTBEATS_IN_SECOND = 100;
public static enum State { INITIALIZING, RUNNING }
State state = State.INITIALIZING;
- private static final int SYSTEM_DIR_CLEANUP_RETRY_PERIOD = 10000;
+ private static final int FS_ACCESS_RETRY_PERIOD = 10000;
private DNSToSwitchMapping dnsToSwitchMapping;
private NetworkTopology clusterMap = new NetworkTopology();
@@ -1194,17 +1194,38 @@
shouldRecover = false;
// write the jobtracker.info file
- FSDataOutputStream out = FileSystem.create(fs, restartFile, filePerm);
- out.writeInt(0);
- out.close();
+ try {
+ FSDataOutputStream out = FileSystem.create(fs, restartFile,
+ filePerm);
+ out.writeInt(0);
+ out.close();
+ } catch (IOException ioe) {
+ LOG.warn("Writing to file " + restartFile + " failed!");
+ LOG.warn("FileSystem is not ready yet!");
+ fs.delete(restartFile, false);
+ throw ioe;
+ }
return;
}
FSDataInputStream in = fs.open(restartFile);
- // read the old count
- restartCount = in.readInt();
- ++restartCount; // increment the restart count
- in.close();
+ try {
+ // read the old count
+ restartCount = in.readInt();
+ ++restartCount; // increment the restart count
+ } catch (IOException ioe) {
+ LOG.warn("System directory is garbled. Failed to read file "
+ + restartFile);
+ LOG.warn("Jobtracker recovery is not possible with garbled"
+ + " system directory! Please delete the system directory and"
+ + " restart the jobtracker. Note that deleting the system"
+ + " directory will result in loss of all the running jobs.");
+ throw new RuntimeException(ioe);
+ } finally {
+ if (in != null) {
+ in.close();
+ }
+ }
// Write back the new restart count and rename the old info file
//TODO This is similar to jobhistory recovery, maybe this common code
@@ -1725,24 +1746,7 @@
}
LOG.info("problem cleaning system directory: " + systemDir, ie);
}
- Thread.sleep(SYSTEM_DIR_CLEANUP_RETRY_PERIOD);
- }
-
- // Prepare for recovery. This is done irrespective of the status of restart
- // flag.
- try {
- recoveryManager.updateRestartCount();
- } catch (IOException ioe) {
- LOG.warn("Failed to initialize recovery manager. The Recovery manager "
- + "failed to access the system files in the system dir ("
- + getSystemDir() + ").");
- LOG.warn("It might be because the JobTracker failed to read/write system"
- + " files (" + recoveryManager.getRestartCountFile() + " / "
- + recoveryManager.getTempRestartCountFile() + ") or the system "
- + " file " + recoveryManager.getRestartCountFile()
- + " is missing!");
- LOG.warn("Bailing out...");
- throw ioe;
+ Thread.sleep(FS_ACCESS_RETRY_PERIOD);
}
// Same with 'localDir' except it's always on the local disk.
@@ -1860,6 +1864,20 @@
* Run forever
*/
public void offerService() throws InterruptedException, IOException {
+ // Prepare for recovery. This is done irrespective of the status of restart
+ // flag.
+ while (true) {
+ try {
+ recoveryManager.updateRestartCount();
+ break;
+ } catch (IOException ioe) {
+ LOG.warn("Failed to initialize recovery manager. ", ioe);
+ // wait for some time
+ Thread.sleep(FS_ACCESS_RETRY_PERIOD);
+ LOG.warn("Retrying...");
+ }
+ }
+
taskScheduler.start();
// Start the recovery after starting the scheduler
Modified:
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=784661&r1=784660&r2=784661&view=diff
==============================================================================
---
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java
(original)
+++
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java
Mon Jun 15 06:05:01 2009
@@ -28,6 +28,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.JobTracker.RecoveryManager;
import org.apache.hadoop.mapred.MiniMRCluster.JobTrackerRunner;
import org.apache.hadoop.mapred.TestJobInProgressListener.MyScheduler;
@@ -310,7 +311,7 @@
fs.delete(rFile,false);
// start the jobtracker
- LOG.info("Stopping jobtracker with system files deleted");
+ LOG.info("Starting jobtracker with system files deleted");
mr.startJobTracker();
UtilsForTests.waitForJobTracker(jc);
@@ -394,8 +395,58 @@
LOG.info("Starting jobtracker with fs errors");
mr.startJobTracker();
JobTrackerRunner runner = mr.getJobTrackerRunner();
- assertFalse("Restart count for new job is incorrect", runner.isActive());
+ assertFalse("JobTracker is still alive", runner.isActive());
mr.shutdown();
}
+
+ /**
+ * Test if the jobtracker waits for the info file to be created before
+ * starting.
+ */
+ public void testJobTrackerInfoCreation() throws Exception {
+ LOG.info("Testing jobtracker.info file");
+ MiniDFSCluster dfs = new MiniDFSCluster(new Configuration(), 1, true,
null);
+ String namenode = (dfs.getFileSystem()).getUri().getHost() + ":"
+ + (dfs.getFileSystem()).getUri().getPort();
+ // shut down the data nodes
+ dfs.shutdownDataNodes();
+
+ // start the jobtracker
+ JobConf conf = new JobConf();
+ FileSystem.setDefaultUri(conf, namenode);
+ conf.set("mapred.job.tracker", "localhost:0");
+ conf.set("mapred.job.tracker.http.address", "127.0.0.1:0");
+
+ JobTracker jobtracker = new JobTracker(conf);
+
+ // now check if the update restart count works fine or not
+ boolean failed = false;
+ try {
+ jobtracker.recoveryManager.updateRestartCount();
+ } catch (IOException ioe) {
+ failed = true;
+ }
+ assertTrue("JobTracker created info files without datanodes!!!", failed);
+
+ Path restartFile = jobtracker.recoveryManager.getRestartCountFile();
+ Path tmpRestartFile = jobtracker.recoveryManager.getTempRestartCountFile();
+ FileSystem fs = dfs.getFileSystem();
+ assertFalse("Info file exists after update failure",
+ fs.exists(restartFile));
+ assertFalse("Temporary restart-file exists after update failure",
+ fs.exists(restartFile));
+
+ // start 1 data node
+ dfs.startDataNodes(conf, 1, true, null, null, null, null);
+ dfs.waitActive();
+
+ failed = false;
+ try {
+ jobtracker.recoveryManager.updateRestartCount();
+ } catch (IOException ioe) {
+ failed = true;
+ }
+ assertFalse("JobTracker failed to create info files with datanodes!!!",
failed);
+ }
}