Author: mattf
Date: Sun Feb 19 23:28:40 2012
New Revision: 1291091
URL: http://svn.apache.org/viewvc?rev=1291091&view=rev
Log:
MAPREDUCE-3824. Distributed caches are not removed properly. Contributed by
Thomas Graves.
Modified:
hadoop/common/branches/branch-1.0/CHANGES.txt
hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java
hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/JobLocalizer.java
hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
Modified: hadoop/common/branches/branch-1.0/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/CHANGES.txt?rev=1291091&r1=1291090&r2=1291091&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1.0/CHANGES.txt Sun Feb 19 23:28:40 2012
@@ -51,6 +51,8 @@ Release 1.0.1 - 2012.02.19
HADOOP-8050. Deadlock in metrics. (Kihwal Lee via mattf)
+ MAPREDUCE-3824. Distributed caches are not removed properly. (Thomas Graves
+ via mattf)
Release 1.0.0 - 2011.12.15
Modified:
hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java?rev=1291091&r1=1291090&r2=1291091&view=diff
==============================================================================
---
hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java
(original)
+++
hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java
Sun Feb 19 23:28:40 2012
@@ -259,10 +259,10 @@ public class TaskDistributedCacheManager
public void setSizes(long[] sizes) throws IOException {
int i = 0;
for (CacheFile c: cacheFiles) {
- if (!c.isPublic && c.type == CacheFile.FileType.ARCHIVE &&
- c.status != null) {
- distributedCacheManager.setSize(c.status, sizes[i++]);
+ if (!c.isPublic && c.status != null) {
+ distributedCacheManager.setSize(c.status, sizes[i]);
}
+ i++;
}
}
Modified:
hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1291091&r1=1291090&r2=1291091&view=diff
==============================================================================
---
hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
(original)
+++
hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
Sun Feb 19 23:28:40 2012
@@ -34,6 +34,7 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -546,7 +547,7 @@ public class TrackerDistributedCacheMana
//
// This field should be accessed under global cachedArchives lock.
//
- private int refcount; // number of instances using this cache
+ private AtomicInteger refcount; // number of instances using this cache
//
// The following two fields should be accessed under
@@ -577,7 +578,7 @@ public class TrackerDistributedCacheMana
String uniqueString, String user, String key) {
super();
this.localizedLoadPath = localLoadPath;
- this.refcount = 0;
+ this.refcount = new AtomicInteger();
this.localizedBaseDir = baseDir;
this.size = 0;
this.subDir = subDir;
@@ -587,14 +588,16 @@ public class TrackerDistributedCacheMana
}
public synchronized void incRefCount() {
- refcount += 1;
+ refcount.incrementAndGet() ;
+ LOG.debug(localizedLoadPath + ": refcount=" + refcount.get());
}
public void decRefCount() {
synchronized (cachedArchives) {
synchronized (this) {
- refcount -= 1;
- if(refcount <= 0) {
+ refcount.decrementAndGet() ;
+ LOG.debug(localizedLoadPath + ": refcount=" + refcount.get());
+ if(refcount.get() <= 0) {
String key = this.key;
cachedArchives.remove(key);
cachedArchives.put(key, this);
@@ -604,11 +607,12 @@ public class TrackerDistributedCacheMana
}
public int getRefCount() {
- return refcount;
+ return refcount.get();
}
public synchronized boolean isUsed() {
- return refcount > 0;
+ LOG.debug(localizedLoadPath + ": refcount=" + refcount.get());
+ return refcount.get() > 0;
}
Path getBaseDir(){
@@ -641,7 +645,8 @@ public class TrackerDistributedCacheMana
try {
localFs.delete(f.getValue().localizedLoadPath, true);
} catch (IOException ie) {
- LOG.debug("Error cleaning up cache", ie);
+ LOG.debug("Error cleaning up cache (" +
+ f.getValue().localizedLoadPath + ")", ie);
}
}
cachedArchives.clear();
@@ -657,6 +662,10 @@ public class TrackerDistributedCacheMana
return result;
}
+ /**
+ * Set the sizes for any archives, files, or directories in the private
+ * distributed cache.
+ */
public void setArchiveSizes(JobID jobId, long[] sizes) throws IOException {
TaskDistributedCacheManager mgr = jobArchives.get(jobId);
if (mgr != null) {
@@ -978,8 +987,13 @@ public class TrackerDistributedCacheMana
HashMap<Path, CacheDir> toBeCleanedBaseDir =
new HashMap<Path, CacheDir>();
synchronized (properties) {
+ LOG.debug("checkAndCleanup: Allowed Cache Size test");
for (Map.Entry<Path, CacheDir> baseDir : properties.entrySet()) {
CacheDir baseDirCounts = baseDir.getValue();
+ LOG.debug(baseDir.getKey() + ": allowedCacheSize=" +
allowedCacheSize +
+ ",baseDirCounts.size=" + baseDirCounts.size +
+ ",allowedCacheSubdirs=" + allowedCacheSubdirs +
+ ",baseDirCounts.subdirs=" + baseDirCounts.subdirs);
if (allowedCacheSize < baseDirCounts.size ||
allowedCacheSubdirs < baseDirCounts.subdirs) {
CacheDir tcc = new CacheDir();
@@ -991,6 +1005,7 @@ public class TrackerDistributedCacheMana
}
// try deleting cache Status with refcount of zero
synchronized (cachedArchives) {
+ LOG.debug("checkAndCleanup: Global Cache Size Check");
for(
Iterator<Map.Entry<String, CacheStatus>> it
= cachedArchives.entrySet().iterator();
@@ -999,11 +1014,16 @@ public class TrackerDistributedCacheMana
String cacheId = entry.getKey();
CacheStatus cacheStatus = cachedArchives.get(cacheId);
CacheDir leftToClean =
toBeCleanedBaseDir.get(cacheStatus.getBaseDir());
+
if (leftToClean != null && (leftToClean.size > 0 ||
leftToClean.subdirs > 0)) {
synchronized (cacheStatus) {
// if reference count is zero mark the cache for deletion
- if (!cacheStatus.isUsed()) {
- leftToClean.size -= cacheStatus.size;
+ boolean isUsed = cacheStatus.isUsed();
+ long cacheSize = cacheStatus.size;
+ LOG.debug(cacheStatus.getLocalizedUniqueDir() + ": isUsed=" +
isUsed +
+ " size=" + cacheSize + " leftToClean.size=" +
leftToClean.size);
+ if (!isUsed) {
+ leftToClean.size -= cacheSize;
leftToClean.subdirs--;
// delete this cache entry from the global list
// and mark the localized file for deletion
Modified:
hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/JobLocalizer.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/JobLocalizer.java?rev=1291091&r1=1291090&r2=1291091&view=diff
==============================================================================
---
hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/JobLocalizer.java
(original)
+++
hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/JobLocalizer.java
Sun Feb 19 23:28:40 2012
@@ -28,6 +28,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -339,21 +340,25 @@ public class JobLocalizer {
* @return the size of the archive objects
*/
public static long[] downloadPrivateCache(Configuration conf) throws
IOException {
- downloadPrivateCacheObjects(conf,
+ long[] fileSizes = downloadPrivateCacheObjects(conf,
DistributedCache.getCacheFiles(conf),
DistributedCache.getLocalCacheFiles(conf),
DistributedCache.getFileTimestamps(conf),
TrackerDistributedCacheManager.
getFileVisibilities(conf),
false);
- return
- downloadPrivateCacheObjects(conf,
+
+ long[] archiveSizes = downloadPrivateCacheObjects(conf,
DistributedCache.getCacheArchives(conf),
DistributedCache.getLocalCacheArchives(conf),
DistributedCache.getArchiveTimestamps(conf),
TrackerDistributedCacheManager.
getArchiveVisibilities(conf),
true);
+
+ // The order here matters - it has to match order of cache files
+ // in TaskDistributedCacheManager.
+ return ArrayUtils.addAll(fileSizes, archiveSizes);
}
public void localizeJobFiles(JobID jobid, JobConf jConf,
Modified:
hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=1291091&r1=1291090&r2=1291091&view=diff
==============================================================================
---
hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
(original)
+++
hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
Sun Feb 19 23:28:40 2012
@@ -173,7 +173,7 @@ public interface TaskUmbilicalProtocol e
/**
* The job initializer needs to report the sizes of the archive
- * objects in the private distributed cache.
+ * objects and directories in the private distributed cache.
* @param jobId the job to update
* @param sizes the array of sizes that were computed
* @throws IOException
Modified:
hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1291091&r1=1291090&r2=1291091&view=diff
==============================================================================
---
hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
(original)
+++
hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
Sun Feb 19 23:28:40 2012
@@ -77,6 +77,10 @@ public class TestTrackerDistributedCache
protected Path firstCacheFilePublic;
protected Path secondCacheFile;
protected Path secondCacheFilePublic;
+ protected Path firstCacheDirPublic;
+ protected Path firstCacheDirPrivate;
+ protected Path firstCacheFileInDirPublic;
+ protected Path firstCacheFileInDirPrivate;
private FileSystem fs;
protected LocalDirAllocator localDirAllocator =
@@ -126,6 +130,15 @@ public class TestTrackerDistributedCache
createPublicTempFile(secondCacheFilePublic);
createPrivateTempFile(firstCacheFile);
createPrivateTempFile(secondCacheFile);
+
+ firstCacheDirPublic = new Path(TEST_ROOT_DIR, "firstcachedirPublic");
+ firstCacheDirPrivate = new Path(TEST_ROOT_DIR, "firstcachedirPrivate");
+ firstCacheFileInDirPublic = new Path(firstCacheDirPublic,
"firstcacheFileinDirPublic.txt");
+ firstCacheFileInDirPrivate = new Path(firstCacheDirPrivate,
"firstcacheFileinDirPrivate.txt");
+ createPublicTempDir(firstCacheDirPublic);
+ createPrivateTempDir(firstCacheDirPrivate);
+ createPublicTempFile(firstCacheFileInDirPublic);
+ createPrivateTempFile(firstCacheFileInDirPrivate);
}
protected void refreshConf(Configuration conf) throws IOException {
@@ -253,41 +266,79 @@ public class TestTrackerDistributedCache
TrackerDistributedCacheManager.determineCacheVisibilities(conf1);
// Task localizing for first job
+ JobID jobId = new JobID("jt", 1);
TaskDistributedCacheManager handle = manager
- .newTaskDistributedCacheManager(new JobID("jt", 1), conf1);
+ .newTaskDistributedCacheManager(jobId, conf1);
handle.setupCache(conf1, TaskTracker.getPublicDistributedCacheDir(),
TaskTracker.getPrivateDistributedCacheDir(userName));
- JobLocalizer.downloadPrivateCache(conf1);
+ long[] sizes = JobLocalizer.downloadPrivateCache(conf1);
+ if (sizes != null) {
+ manager.setArchiveSizes(jobId, sizes);
+ }
+ handle.release();
+ for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
+ assertEquals(0, manager.getReferenceCount(c.getStatus()));
+ long filesize = FileUtil.getDU(new
File(c.getStatus().localizedLoadPath.getParent().toString()));
+ assertTrue("filesize is not greater than 0", filesize > 0);
+ assertEquals(filesize, c.getStatus().size);
+ }
+
+ // Test specifying directories to go into distributed cache and make
+ // their sizes are calculated properly.
+ Job job2 = new Job(conf);
+ Configuration conf2 = job2.getConfiguration();
+ conf1.set("user.name", userName);
+ DistributedCache.addCacheFile(firstCacheDirPublic.toUri(), conf2);
+ DistributedCache.addCacheFile(firstCacheDirPrivate.toUri(), conf2);
+
+ TrackerDistributedCacheManager.determineTimestamps(conf2);
+ TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
+
+ // Task localizing for second job
+ JobID job2Id = new JobID("jt", 2);
+ handle = manager.newTaskDistributedCacheManager(job2Id, conf2);
+ handle.setupCache(conf2, TaskTracker.getPublicDistributedCacheDir(),
+ TaskTracker.getPrivateDistributedCacheDir(userName));
+ long[] sizes2 = JobLocalizer.downloadPrivateCache(conf2);
+ for (int j=0; j > sizes2.length; j++) {
+ LOG.info("size is: " + sizes2[j]);
+ }
+ if (sizes2 != null) {
+ manager.setArchiveSizes(job2Id, sizes2);
+ }
handle.release();
for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
assertEquals(0, manager.getReferenceCount(c.getStatus()));
+ long filesize = FileUtil.getDU(new
File(c.getStatus().localizedLoadPath.getParent().toString()));
+ assertTrue("filesize is not greater than 0", filesize > 0);
+ assertEquals(filesize, c.getStatus().size);
}
Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
createPrivateTempFile(thirdCacheFile);
// Configures another job with three regular files.
- Job job2 = new Job(conf);
- Configuration conf2 = job2.getConfiguration();
- conf2.set("user.name", userName);
+ Job job3 = new Job(conf);
+ Configuration conf3 = job3.getConfiguration();
+ conf3.set("user.name", userName);
// add a file that would get failed to localize
- DistributedCache.addCacheFile(firstCacheFilePublic.toUri(), conf2);
+ DistributedCache.addCacheFile(firstCacheFilePublic.toUri(), conf3);
// add a file that is already localized by different job
- DistributedCache.addCacheFile(secondCacheFile.toUri(), conf2);
+ DistributedCache.addCacheFile(secondCacheFile.toUri(), conf3);
// add a file that is never localized
- DistributedCache.addCacheFile(thirdCacheFile.toUri(), conf2);
+ DistributedCache.addCacheFile(thirdCacheFile.toUri(), conf3);
- TrackerDistributedCacheManager.determineTimestamps(conf2);
- TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
+ TrackerDistributedCacheManager.determineTimestamps(conf3);
+ TrackerDistributedCacheManager.determineCacheVisibilities(conf3);
- // Task localizing for second job
+ // Task localizing for third job
// localization for the "firstCacheFile" will fail.
- handle = manager.newTaskDistributedCacheManager(new JobID("jt", 2), conf2);
+ handle = manager.newTaskDistributedCacheManager(new JobID("jt", 3), conf3);
Throwable th = null;
try {
- handle.setupCache(conf2, TaskTracker.getPublicDistributedCacheDir(),
+ handle.setupCache(conf3, TaskTracker.getPublicDistributedCacheDir(),
TaskTracker.getPrivateDistributedCacheDir(userName));
- JobLocalizer.downloadPrivateCache(conf2);
+ JobLocalizer.downloadPrivateCache(conf3);
} catch (IOException e) {
th = e;
LOG.info("Exception during setup", e);
@@ -939,6 +990,13 @@ public class TestTrackerDistributedCache
createTempFile(p, TEST_FILE_SIZE);
}
+ static void createTempDir(Path p) throws IOException {
+ File dir = new File(p.toString());
+ dir.mkdirs();
+ FileSystem.LOG.info("created temp directory: " + p);
+
+ }
+
static void createTempFile(Path p, int size) throws IOException {
File f = new File(p.toString());
FileOutputStream os = new FileOutputStream(f);
@@ -961,12 +1019,30 @@ public class TestTrackerDistributedCache
FileUtil.chmod(p.toString(), "0770",true);
}
+ static void createPublicTempDir(Path p)
+ throws IOException, InterruptedException {
+ createTempDir(p);
+ FileUtil.chmod(p.toString(), "0777",true);
+ }
+
+ static void createPrivateTempDir(Path p)
+ throws IOException, InterruptedException {
+ createTempDir(p);
+ FileUtil.chmod(p.toString(), "0770",true);
+ }
+
@Override
protected void tearDown() throws IOException {
new File(firstCacheFile.toString()).delete();
new File(secondCacheFile.toString()).delete();
new File(firstCacheFilePublic.toString()).delete();
new File(secondCacheFilePublic.toString()).delete();
+
+ new File(firstCacheFileInDirPublic.toString()).delete();
+ new File(firstCacheFileInDirPrivate.toString()).delete();
+ new File(firstCacheDirPrivate.toString()).delete();
+ new File(firstCacheDirPublic.toString()).delete();
+
FileUtil.fullyDelete(new File(TEST_ROOT_DIR));
}