Author: kasha
Date: Tue Aug 5 06:31:39 2014
New Revision: 1615868
URL: http://svn.apache.org/r1615868
Log:
MAPREDUCE-5968. Work directory is not deleted when downloadCacheObject throws
IOException. (Zhihai Xu va kasha)
Modified:
hadoop/common/branches/branch-1/CHANGES.txt
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1615868&r1=1615867&r2=1615868&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Tue Aug 5 06:31:39 2014
@@ -233,6 +233,9 @@ Release 1.3.0 - unreleased
MAPREDUCE-5966. MR1 FairScheduler use of custom weight adjuster is not
thread safe for comparisons. (Anubhav Dhoot via kasha)
+ MAPREDUCE-5968. Work directory is not deleted when downloadCacheObject
+ throws IOException. (Zhihai Xu va kasha)
+
Release 1.2.2 - unreleased
INCOMPATIBLE CHANGES
Modified:
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1615868&r1=1615867&r2=1615868&view=diff
==============================================================================
---
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
(original)
+++
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
Tue Aug 5 06:31:39 2014
@@ -69,6 +69,7 @@ import org.apache.hadoop.mapreduce.secur
* interface.</b>
*/
public class TrackerDistributedCacheManager {
+ public static final String WORK_DIR_FIX = "-work-";
// cacheID to cacheStatus mapping
private LinkedHashMap<String, CacheStatus> cachedArchives =
new LinkedHashMap<String, CacheStatus>();
@@ -372,7 +373,7 @@ public class TrackerDistributedCacheMana
}
private static Path createRandomPath(Path base) throws IOException {
- return new Path(base.toString() + "-work-" + random.nextLong());
+ return new Path(base.toString() + WORK_DIR_FIX + random.nextLong());
}
/**
@@ -427,39 +428,44 @@ public class TrackerDistributedCacheMana
if (!localFs.mkdirs(workDir, permission)) {
throw new IOException("Mkdirs failed to create directory " + workDir);
}
- Path workFile = new Path(workDir, parchive.getName());
- sourceFs.copyToLocalFile(sourcePath, workFile);
- localFs.setPermission(workFile, permission);
- if (isArchive) {
- String tmpArchive = workFile.getName().toLowerCase();
- File srcFile = new File(workFile.toString());
- File destDir = new File(workDir.toString());
- LOG.info(String.format("Extracting %s to %s",
- srcFile.toString(), destDir.toString()));
- if (tmpArchive.endsWith(".jar")) {
- RunJar.unJar(srcFile, destDir);
- } else if (tmpArchive.endsWith(".zip")) {
- FileUtil.unZip(srcFile, destDir);
- } else if (isTarFile(tmpArchive)) {
- FileUtil.unTar(srcFile, destDir);
- } else {
- LOG.warn(String.format(
- "Cache file %s specified as archive, but not valid extension.",
- srcFile.toString()));
- // else will not do anyhting
- // and copy the file into the dir as it is
- }
- FileUtil.chmod(destDir.toString(), "ugo+rx", true);
- }
- // promote the output to the final location
- if (!localFs.rename(workDir, finalDir)) {
- localFs.delete(workDir, true);
- if (!localFs.exists(finalDir)) {
- throw new IOException("Failed to promote distributed cache object " +
- workDir + " to " + finalDir);
+ try {
+ Path workFile = new Path(workDir, parchive.getName());
+ sourceFs.copyToLocalFile(sourcePath, workFile);
+ localFs.setPermission(workFile, permission);
+ if (isArchive) {
+ String tmpArchive = workFile.getName().toLowerCase();
+ File srcFile = new File(workFile.toString());
+ File destDir = new File(workDir.toString());
+ LOG.info(String.format("Extracting %s to %s",
+ srcFile.toString(), destDir.toString()));
+ if (tmpArchive.endsWith(".jar")) {
+ RunJar.unJar(srcFile, destDir);
+ } else if (tmpArchive.endsWith(".zip")) {
+ FileUtil.unZip(srcFile, destDir);
+ } else if (isTarFile(tmpArchive)) {
+ FileUtil.unTar(srcFile, destDir);
+ } else {
+ LOG.warn(String.format(
+ "Cache file %s specified as archive, but not valid extension.",
+ srcFile.toString()));
+ // else will not do anyhting
+ // and copy the file into the dir as it is
+ }
+ FileUtil.chmod(destDir.toString(), "ugo+rx", true);
+ }
+ // promote the output to the final location
+ if (!localFs.rename(workDir, finalDir)) {
+ if (!localFs.exists(finalDir)) {
+ throw new IOException("Failed to promote distributed cache object " +
+ workDir + " to " + finalDir);
+ }
+ // someone else promoted first
+ return 0;
+ }
+ } finally {
+ if (localFs.exists(workDir)) {
+ localFs.delete(workDir, true);
}
- // someone else promoted first
- return 0;
}
LOG.info(String.format("Cached %s as %s",
Modified:
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1615868&r1=1615867&r2=1615868&view=diff
==============================================================================
---
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
(original)
+++
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
Tue Aug 5 06:31:39 2014
@@ -50,6 +50,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.permission.FsAction;
@@ -57,6 +58,7 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.filecache.TaskDistributedCacheManager;
import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ReflectionUtils;
public class TestTrackerDistributedCacheManager extends TestCase {
@@ -1239,4 +1241,48 @@ public class TestTrackerDistributedCache
assertNull(taskDistributedCacheManager);
}
+ public void testRemoveWorkDirInDownloadCacheObject() throws Exception {
+ if (!canRun()) {
+ return;
+ }
+ // This is to test the workDir is removed, when IOException happened
+ // use TestFileSystem to generate an IOException,
+ // then verify whether the workDir is deleted.
+ FsPermission filePerm = FsPermission.createImmutable((short)0755);
+ Configuration myConf = new Configuration(conf);
+ myConf.setClass("fs.test.impl", TestFileSystem.class, FileSystem.class);
+ Path testDir = new Path(TEST_ROOT_DIR, "testDir");
+ Path destination = new Path(testDir.toString(),
+ "downloadCacheObjectTestDir");
+ Path fileToCache = new Path("test:///" + destination.toUri().getPath());
+ try {
+ TrackerDistributedCacheManager.downloadCacheObject(myConf,
+ fileToCache.toUri(), destination, 0L, false, filePerm);
+ fail("did not throw an exception");
+ } catch (IOException e) {
+ GenericTestUtils.assertExceptionContains(
+ "Force an IOException for test", e);
+ }
+ File TEST_ROOT = new File(TEST_ROOT_DIR);
+ String workDir = destination.getParent().toString() +
+ TrackerDistributedCacheManager.WORK_DIR_FIX;
+ for (File f : TEST_ROOT.listFiles()) {
+ assertFalse(f.toString().contains(workDir));
+ }
+ }
+
+ static class TestFileSystem extends LocalFileSystem {
+ public TestFileSystem() {
+ super();
+ }
+ @Override
+ public void copyToLocalFile(boolean delSrc, Path src, Path dst)
+ throws IOException {
+ throw new IOException("Force an IOException for test");
+ }
+ @Override
+ public FileStatus getFileStatus(Path f) throws IOException {
+ return new FileStatus();
+ }
+ }
}