Author: omalley
Date: Fri Mar 4 03:36:57 2011
New Revision: 1077064
URL: http://svn.apache.org/viewvc?rev=1077064&view=rev
Log:
commit f2194ab0064e82e4d029763760d7af4680cee9f6
Author: Hemanth Yamijala <[email protected]>
Date: Tue Dec 1 09:15:25 2009 +0530
MAPREDUCE:1140 from
https://issues.apache.org/jira/secure/attachment/12426383/patch-1140-2-ydist.txt
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1140. Fix DistributedCache to not decrement reference counts
for
+ unreferenced files in error conditions.
+ (Amareshwari Sriramadasu via yhemanth)
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/filecache/DistributedCache.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/filecache/DistributedCache.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/filecache/DistributedCache.java?rev=1077064&r1=1077063&r2=1077064&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/filecache/DistributedCache.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/filecache/DistributedCache.java
Fri Mar 4 03:36:57 2011
@@ -216,38 +216,49 @@ public class DistributedCache {
lcacheStatus.refcount++;
}
- synchronized (lcacheStatus) {
- if (!lcacheStatus.isInited()) {
- localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus,
- fileStatus, isArchive);
- lcacheStatus.initComplete();
- } else {
- localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
- lcacheStatus, fileStatus, isArchive);
- }
- createSymlink(conf, cache, lcacheStatus, isArchive,
- currentWorkDir, honorSymLinkConf);
- }
-
- // try deleting stuff if you can
- long size = 0;
- synchronized (lcacheStatus) {
- synchronized (baseDirSize) {
- Long get = baseDirSize.get(lcacheStatus.getBaseDir());
- if ( get != null ) {
- size = get.longValue();
+ boolean initSuccessful = false;
+ try {
+ synchronized (lcacheStatus) {
+ if (!lcacheStatus.isInited()) {
+ localizedPath = localizeCache(conf, cache, confFileStamp,
+ lcacheStatus, fileStatus, isArchive);
+ lcacheStatus.initComplete();
} else {
- LOG.warn("Cannot find size of baseDir: " +
lcacheStatus.getBaseDir());
+ localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
+ lcacheStatus, fileStatus, isArchive);
+ }
+ createSymlink(conf, cache, lcacheStatus, isArchive, currentWorkDir,
+ honorSymLinkConf);
+ }
+
+ // try deleting stuff if you can
+ long size = 0;
+ synchronized (lcacheStatus) {
+ synchronized (baseDirSize) {
+ Long get = baseDirSize.get(lcacheStatus.getBaseDir());
+ if (get != null) {
+ size = get.longValue();
+ } else {
+ LOG.warn("Cannot find size of baseDir: "
+ + lcacheStatus.getBaseDir());
+ }
+ }
+ }
+ // setting the cache size to a default of 10GB
+ long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE);
+ if (allowedSize < size) {
+ // try some cache deletions
+ deleteCache(conf);
+ }
+ initSuccessful = true;
+ return localizedPath;
+ } finally {
+ if (!initSuccessful) {
+ synchronized (cachedArchives) {
+ lcacheStatus.refcount--;
}
}
}
- // setting the cache size to a default of 10GB
- long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE);
- if (allowedSize < size) {
- // try some cache deletions
- deleteCache(conf);
- }
- return localizedPath;
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1077064&r1=1077063&r2=1077064&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
Fri Mar 4 03:36:57 2011
@@ -140,9 +140,19 @@ abstract class TaskRunner extends Thread
return jobConf.get(JobConf.MAPRED_TASK_ENV);
}
+ private static class CacheFile {
+ URI uri;
+ long timeStamp;
+ CacheFile (URI uri, long timeStamp) {
+ this.uri = uri;
+ this.timeStamp = timeStamp;
+ }
+ }
+
@Override
public final void run() {
String errorInfo = "Child Error";
+ List<CacheFile> localizedCacheFiles = new ArrayList<CacheFile>();
try {
//before preparing the job localize
@@ -187,6 +197,8 @@ abstract class TaskRunner extends Thread
getAbsolutePath()),
false,
lDirAlloc);
+ localizedCacheFiles.add(new CacheFile(archives[i], Long
+ .parseLong(archivesTimestamps[i])));
}
DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
@@ -207,6 +219,8 @@ abstract class TaskRunner extends Thread
getAbsolutePath()),
false,
lDirAlloc);
+ localizedCacheFiles.add(new CacheFile(files[i], Long
+ .parseLong(fileTimestamps[i])));
}
DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
}
@@ -523,22 +537,8 @@ abstract class TaskRunner extends Thread
}
} finally {
try{
- URI[] archives = DistributedCache.getCacheArchives(conf);
- URI[] files = DistributedCache.getCacheFiles(conf);
- String[] archivesTimestamps =
- DistributedCache.getArchiveTimestamps(conf);
- String[] fileTimestamps = DistributedCache.getFileTimestamps(conf);
- if (archives != null){
- for (int i = 0; i < archives.length; i++){
- DistributedCache.releaseCache(archives[i], conf,
- Long.parseLong(archivesTimestamps[i]));
- }
- }
- if (files != null){
- for(int i = 0; i < files.length; i++){
- DistributedCache.releaseCache(files[i], conf,
- Long.parseLong(fileTimestamps[i]));
- }
+ for (CacheFile cf : localizedCacheFiles){
+ DistributedCache.releaseCache(cf.uri, conf, cf.timeStamp);
}
}catch(IOException ie){
LOG.warn("Error releasing caches : Cache files might not have been
cleaned up");