Author: omalley
Date: Fri Mar 4 03:43:24 2011
New Revision: 1077124
URL: http://svn.apache.org/viewvc?rev=1077124&view=rev
Log:
commit 61507d10e1103b878f72d7da49dfc08a6b3d61ed
Author: Hemanth Yamijala <yhemanth@friendchild-lm.(none)>
Date: Sat Jan 23 20:27:58 2010 +0530
MAPREDUCE:1140 from
https://issues.apache.org/jira/secure/attachment/12431213/patch-1140-3-y20.txt
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1140. Fix DistributedCache to not decrement reference counts
+ for unreferenced files in error conditions.
+ (Amareshwari Sriramadasu via yhemanth)
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java?rev=1077124&r1=1077123&r2=1077124&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java
Fri Mar 4 03:43:24 2011
@@ -33,7 +33,6 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
@@ -73,6 +72,7 @@ public class TaskDistributedCacheManager
final long timestamp;
/** Whether this is to be added to the classpath */
final boolean shouldBeAddedToClassPath;
+ boolean localized = false;
private CacheFile(URI uri, FileType type, long timestamp,
boolean classPath) {
@@ -109,6 +109,14 @@ public class TaskDistributedCacheManager
}
return ret;
}
+
+ boolean getLocalized() {
+ return localized;
+ }
+
+ void setLocalized(boolean val) {
+ localized = val;
+ }
}
TaskDistributedCacheManager(
@@ -157,6 +165,7 @@ public class TaskDistributedCacheManager
cacheSubdir, fileStatus,
cacheFile.type == CacheFile.FileType.ARCHIVE,
cacheFile.timestamp, workdirPath, false);
+ cacheFile.setLocalized(true);
if (cacheFile.type == CacheFile.FileType.ARCHIVE) {
localArchives.add(p);
@@ -179,6 +188,13 @@ public class TaskDistributedCacheManager
}
+ /*
+ * This method is called from unit tests.
+ */
+ List<CacheFile> getCacheFiles() {
+ return cacheFiles;
+ }
+
private static String stringifyPathList(List<Path> p){
if (p == null || p.isEmpty()) {
return null;
@@ -210,7 +226,9 @@ public class TaskDistributedCacheManager
*/
public void release() throws IOException {
for (CacheFile c : cacheFiles) {
- distributedCacheManager.releaseCache(c.uri, taskConf, c.timestamp);
+ if (c.getLocalized()) {
+ distributedCacheManager.releaseCache(c.uri, taskConf, c.timestamp);
+ }
}
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1077124&r1=1077123&r2=1077124&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
Fri Mar 4 03:43:24 2011
@@ -129,39 +129,50 @@ public class TrackerDistributedCacheMana
lcacheStatus.refcount++;
}
- // do the localization, after releasing the global lock
- synchronized (lcacheStatus) {
- if (!lcacheStatus.isInited()) {
- localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus,
- fileStatus, isArchive);
- lcacheStatus.initComplete();
- } else {
- localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
- lcacheStatus, fileStatus, isArchive);
+ boolean initSuccessful = false;
+ try {
+ // do the localization, after releasing the global lock
+ synchronized (lcacheStatus) {
+ if (!lcacheStatus.isInited()) {
+ localizedPath = localizeCache(conf, cache, confFileStamp,
+ lcacheStatus, fileStatus, isArchive);
+ lcacheStatus.initComplete();
+ } else {
+ localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
+ lcacheStatus, fileStatus, isArchive);
+ }
+ createSymlink(conf, cache, lcacheStatus, isArchive,
+ currentWorkDir, honorSymLinkConf);
}
- createSymlink(conf, cache, lcacheStatus, isArchive,
- currentWorkDir, honorSymLinkConf);
- }
- // try deleting stuff if you can
- long size = 0;
- synchronized (lcacheStatus) {
- synchronized (baseDirSize) {
- Long get = baseDirSize.get(lcacheStatus.getBaseDir());
- if ( get != null ) {
- size = get.longValue();
- } else {
- LOG.warn("Cannot find size of baseDir: " +
lcacheStatus.getBaseDir());
+ // try deleting stuff if you can
+ long size = 0;
+ synchronized (lcacheStatus) {
+ synchronized (baseDirSize) {
+ Long get = baseDirSize.get(lcacheStatus.getBaseDir());
+ if (get != null) {
+ size = get.longValue();
+ } else {
+ LOG.warn("Cannot find size of baseDir: "
+ + lcacheStatus.getBaseDir());
+ }
+ }
+ }
+ // setting the cache size to a default of 10GB
+ long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE);
+ if (allowedSize < size) {
+ // try some cache deletions
+ deleteCache(conf);
+ }
+ initSuccessful = true;
+ return localizedPath;
+ } finally {
+ if (!initSuccessful) {
+ synchronized (cachedArchives) {
+ lcacheStatus.refcount--;
}
}
}
- // setting the cache size to a default of 10GB
- long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE);
- if (allowedSize < size) {
- // try some cache deletions
- deleteCache(conf);
- }
- return localizedPath;
}
/**
@@ -188,6 +199,21 @@ public class TrackerDistributedCacheMana
}
}
+ /*
+ * This method is called from unit tests.
+ */
+ int getReferenceCount(URI cache, Configuration conf, long timeStamp)
+ throws IOException {
+ String key = getKey(cache, conf, timeStamp);
+ synchronized (cachedArchives) {
+ CacheStatus lcacheStatus = cachedArchives.get(key);
+ if (lcacheStatus == null) {
+ throw new IOException("Cannot find localized cache: " + cache);
+ }
+ return lcacheStatus.refcount;
+ }
+ }
+
// To delete the caches which have a refcount of zero
private void deleteCache(Configuration conf) throws IOException {
@@ -294,7 +320,7 @@ public class TrackerDistributedCacheMana
//the method which actually copies the caches locally and unjars/unzips them
// and does chmod for the files
- private Path localizeCache(Configuration conf,
+ Path localizeCache(Configuration conf,
URI cache, long confFileStamp,
CacheStatus cacheStatus,
FileStatus fileStatus,
@@ -432,7 +458,7 @@ public class TrackerDistributedCacheMana
}
}
- private static class CacheStatus {
+ static class CacheStatus {
// the local load path of this cache
Path localLoadPath;
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1077124&r1=1077123&r2=1077124&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
Fri Mar 4 03:43:24 2011
@@ -22,18 +22,23 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.Random;
import javax.security.auth.login.LoginException;
import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.DefaultTaskController;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TaskController;
import org.apache.hadoop.mapred.TaskTracker;
import org.apache.hadoop.mapred.TaskController.InitializationContext;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -45,7 +50,10 @@ import org.apache.hadoop.filecache.TaskD
import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
import org.apache.hadoop.security.UserGroupInformation;
+
public class TestTrackerDistributedCacheManager extends TestCase {
+ private static final Log LOG =
+ LogFactory.getLog(TestTrackerDistributedCacheManager.class);
protected String TEST_ROOT_DIR =
new File(System.getProperty("test.build.data", "/tmp"),
@@ -94,12 +102,24 @@ public class TestTrackerDistributedCache
}
/**
+ * Whether the test can run on the machine
+ *
+ * @return true if test can run on the machine, false otherwise
+ */
+ protected boolean canRun() {
+ return true;
+ }
+
+ /**
* This is the typical flow for using the DistributedCache classes.
*
* @throws IOException
* @throws LoginException
*/
public void testManagerFlow() throws IOException, LoginException {
+ if (!canRun()) {
+ return;
+ }
// ****** Imitate JobClient code
// Configures a task/job with both a regular file and a "classpath" file.
@@ -153,6 +173,101 @@ public class TestTrackerDistributedCache
}
/**
+ * This DistributedCacheManager fails in localizing firstCacheFile.
+ */
+ public class FakeTrackerDistributedCacheManager extends
+ TrackerDistributedCacheManager {
+ public FakeTrackerDistributedCacheManager(Configuration conf)
+ throws IOException {
+ super(conf);
+ }
+
+ @Override
+ Path localizeCache(Configuration conf, URI cache, long confFileStamp,
+ CacheStatus cacheStatus, FileStatus fileStatus, boolean isArchive)
+ throws IOException {
+ if (cache.equals(firstCacheFile.toUri())) {
+ throw new IOException("fake fail");
+ }
+ return super.localizeCache(conf, cache, confFileStamp, cacheStatus,
+ fileStatus, isArchive);
+ }
+ }
+
+ public void testReferenceCount() throws IOException, LoginException,
+ URISyntaxException {
+ if (!canRun()) {
+ return;
+ }
+ Configuration conf = new Configuration();
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
+ TrackerDistributedCacheManager manager =
+ new FakeTrackerDistributedCacheManager(conf);
+
+ String userName = getJobOwnerName();
+ File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
+
+ // Configures a job with a regular file
+ Job job1 = new Job(conf);
+ Configuration conf1 = job1.getConfiguration();
+ DistributedCache.addCacheFile(secondCacheFile.toUri(), conf1);
+
+ TrackerDistributedCacheManager.determineTimestamps(conf1);
+
+ // Task localizing for first job
+ TaskDistributedCacheManager handle = manager
+ .newTaskDistributedCacheManager(conf1);
+ handle.setup(localDirAllocator, workDir, TaskTracker
+ .getDistributedCacheDir(userName));
+ handle.release();
+ for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
+ assertEquals(0, manager.getReferenceCount(c.uri, conf1, c.timestamp));
+ }
+
+ Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
+ createTempFile(thirdCacheFile);
+
+ // Configures another job with three regular files.
+ Job job2 = new Job(conf);
+ Configuration conf2 = job2.getConfiguration();
+ // add a file that would get failed to localize
+ DistributedCache.addCacheFile(firstCacheFile.toUri(), conf2);
+ // add a file that is already localized by different job
+ DistributedCache.addCacheFile(secondCacheFile.toUri(), conf2);
+ // add a file that is never localized
+ DistributedCache.addCacheFile(thirdCacheFile.toUri(), conf2);
+
+ TrackerDistributedCacheManager.determineTimestamps(conf2);
+
+ // Task localizing for second job
+ // localization for the "firstCacheFile" will fail.
+ handle = manager.newTaskDistributedCacheManager(conf2);
+ Throwable th = null;
+ try {
+ handle.setup(localDirAllocator, workDir, TaskTracker
+ .getDistributedCacheDir(userName));
+ } catch (IOException e) {
+ th = e;
+ LOG.info("Exception during setup", e);
+ }
+ assertNotNull(th);
+ assertTrue(th.getMessage().contains("fake fail"));
+ handle.release();
+ th = null;
+ for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
+ try {
+ assertEquals(0, manager.getReferenceCount(c.uri, conf2, c.timestamp));
+ } catch (IOException ie) {
+ th = ie;
+ LOG.info("Exception getting reference count for " + c.uri, ie);
+ }
+ }
+ assertNotNull(th);
+ assertTrue(th.getMessage().contains(thirdCacheFile.getName()));
+ fs.delete(thirdCacheFile, false);
+ }
+
+ /**
* Check proper permissions on the cache files
*
* @param localCacheFiles
@@ -180,6 +295,9 @@ public class TestTrackerDistributedCache
/** test delete cache */
public void testDeleteCache() throws Exception {
+ if (!canRun()) {
+ return;
+ }
TrackerDistributedCacheManager manager =
new TrackerDistributedCacheManager(conf);
FileSystem localfs = FileSystem.getLocal(conf);
@@ -204,6 +322,9 @@ public class TestTrackerDistributedCache
}
public void testFileSystemOtherThanDefault() throws Exception {
+ if (!canRun()) {
+ return;
+ }
TrackerDistributedCacheManager manager =
new TrackerDistributedCacheManager(conf);
conf.set("fs.fakefile.impl", conf.get("fs.file.impl"));
@@ -262,6 +383,9 @@ public class TestTrackerDistributedCache
}
public void testFreshness() throws Exception {
+ if (!canRun()) {
+ return;
+ }
Configuration myConf = new Configuration(conf);
myConf.set("fs.default.name", "refresh:///");
myConf.setClass("fs.refresh.impl", FakeFileSystem.class, FileSystem.class);
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java?rev=1077124&r1=1077123&r2=1077124&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
Fri Mar 4 03:43:24 2011
@@ -21,8 +21,6 @@ package org.apache.hadoop.mapred;
import java.io.File;
import java.io.IOException;
-import javax.security.auth.login.LoginException;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -87,19 +85,9 @@ public class TestTrackerDistributedCache
super.tearDown();
}
- /**
- * Test the control flow of distributed cache manager when
LinuxTaskController
- * is used.
- */
@Override
- public void testManagerFlow()
- throws IOException,
- LoginException {
- if (!ClusterWithLinuxTaskController.shouldRun()) {
- return;
- }
-
- super.testManagerFlow();
+ protected boolean canRun() {
+ return ClusterWithLinuxTaskController.shouldRun();
}
@Override
@@ -165,30 +153,4 @@ public class TestTrackerDistributedCache
path = path.getParentFile();
}
}
-
- @Override
- public void testDeleteCache()
- throws Exception {
- if (!ClusterWithLinuxTaskController.shouldRun()) {
- return;
- }
- super.testDeleteCache();
- }
-
- @Override
- public void testFileSystemOtherThanDefault()
- throws Exception {
- if (!ClusterWithLinuxTaskController.shouldRun()) {
- return;
- }
- super.testFileSystemOtherThanDefault();
- }
-
- @Override
- public void testFreshness() throws Exception {
- if (!ClusterWithLinuxTaskController.shouldRun()) {
- return;
- }
- super.testFreshness();
- }
}