Author: cdouglas
Date: Mon May 16 22:53:09 2011
New Revision: 1103940
URL: http://svn.apache.org/viewvc?rev=1103940&view=rev
Log:
MAPREDUCE-2479. Move distributed cache cleanup to a background task,
backporting MAPREDUCE-1568. Contributed by Robert Joseph Evans
Modified:
hadoop/common/branches/branch-0.20-security/CHANGES.txt
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1103940&r1=1103939&r2=1103940&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Mon May 16 22:53:09
2011
@@ -21,6 +21,9 @@ Release 0.20.205.0 - unreleased
HADOOP-7274. Fix typos in IOUtils. (Jonathan Eagles via cdouglas)
+ MAPREDUCE-2479. Move distributed cache cleanup to a background task,
+ backporting MAPREDUCE-1568. (Robert Joseph Evans via cdouglas)
+
Release 0.20.204.0 - unreleased
BUG FIXES
Modified:
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1103940&r1=1103939&r2=1103940&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
(original)
+++
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
Mon May 16 22:53:09 2011
@@ -22,12 +22,12 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.DateFormat;
+import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
import java.util.LinkedList;
import java.util.Map;
import java.util.Random;
@@ -76,14 +76,6 @@ public class TrackerDistributedCacheMana
private static final FsPermission PUBLIC_CACHE_OBJECT_PERM =
FsPermission.createImmutable((short) 0755);
- // For holding the properties of each cache directory
- static class CacheDir {
- long size;
- long subdirs;
- }
- private TreeMap<Path, CacheDir> baseDirProperties =
- new TreeMap<Path, CacheDir>();
-
// default total cache size (10GB)
private static final long DEFAULT_CACHE_SIZE = 10737418240L;
private static final long DEFAULT_CACHE_SUBDIR_LIMIT = 10000;
@@ -100,6 +92,9 @@ public class TrackerDistributedCacheMana
private Configuration trackerConf;
private static final Random random = new Random();
+
+ BaseDirManager baseDirManager = new BaseDirManager();
+ CleanupThread cleanupThread;
public TrackerDistributedCacheManager(Configuration conf,
TaskController controller
@@ -116,6 +111,7 @@ public class TrackerDistributedCacheMana
("mapreduce.tasktracker.local.cache.numberdirectories",
DEFAULT_CACHE_SUBDIR_LIMIT);
this.taskController = controller;
+ this.cleanupThread = new CleanupThread(conf);
}
/**
@@ -197,7 +193,7 @@ public class TrackerDistributedCacheMana
// Increase the size and sub directory count of the cache
// from baseDirSize and baseDirNumberSubDir.
- addCacheInfoUpdate(lcacheStatus);
+ baseDirManager.addCacheInfoUpdate(lcacheStatus);
}
}
lcacheStatus.initComplete();
@@ -206,27 +202,6 @@ public class TrackerDistributedCacheMana
lcacheStatus, fileStatus,
isArchive);
}
}
-
- // try deleting stuff if you can
- long size = 0;
- long numberSubdirs = 0;
- synchronized (lcacheStatus) {
- synchronized (baseDirProperties) {
- CacheDir cacheDir = baseDirProperties.get(lcacheStatus.getBaseDir());
- if (cacheDir != null) {
- size = cacheDir.size;
- numberSubdirs = cacheDir.subdirs;
- } else {
- LOG.warn("Cannot find size and number of subdirectories of" +
- " baseDir: " + lcacheStatus.getBaseDir());
- }
- }
- }
-
- if (allowedCacheSize < size || allowedCacheSubdirs < numberSubdirs) {
- // try some cache deletions
- compactCache(conf);
- }
} catch (IOException ie) {
synchronized (lcacheStatus) {
// release this cache
@@ -257,7 +232,7 @@ public class TrackerDistributedCacheMana
if (size != 0) {
synchronized (status) {
status.size = size;
- addCacheInfoUpdate(status);
+ baseDirManager.addCacheInfoUpdate(status);
}
}
}
@@ -289,54 +264,6 @@ public class TrackerDistributedCacheMana
return user;
}
-
- // To delete the caches which have a refcount of zero
-
- private void compactCache(Configuration conf) throws IOException {
- List<CacheStatus> deleteList = new LinkedList<CacheStatus>();
- // try deleting cache Status with refcount of zero
- synchronized (cachedArchives) {
- for (Iterator<String> it = cachedArchives.keySet().iterator();
- it.hasNext();) {
- String cacheId = it.next();
- CacheStatus lcacheStatus = cachedArchives.get(cacheId);
- // if reference count is zero
- // mark the cache for deletion
- if (lcacheStatus.refcount == 0) {
- // delete this cache entry from the global list
- // and mark the localized file for deletion
- deleteList.add(lcacheStatus);
- it.remove();
- }
- }
- }
-
- // do the deletion, after releasing the global lock
- for (CacheStatus lcacheStatus : deleteList) {
- synchronized (lcacheStatus) {
- Path potentialDeletee = lcacheStatus.localizedLoadPath;
- Path localizedDir = lcacheStatus.getLocalizedUniqueDir();
- if (lcacheStatus.user == null) {
- LOG.info("Deleted path " + localizedDir);
- try {
- localFs.delete(localizedDir, true);
- } catch (IOException e) {
- LOG.warn("Could not delete distributed cache empty directory "
- + localizedDir, e);
- }
- } else {
- LOG.info("Deleted path " + localizedDir + " as " +
lcacheStatus.user);
- String base = lcacheStatus.getBaseDir().toString();
- String userDir = TaskTracker.getUserDir(lcacheStatus.user);
- int skip = base.length() + 1 + userDir.length() + 1;
- String relative = localizedDir.toString().substring(skip);
- taskController.deleteAsUser(lcacheStatus.user, relative);
- }
- deleteCacheInfoUpdate(lcacheStatus);
- }
- }
- }
-
/*
* Returns the relative path of the dir this cache will be localized in
* relative path that this cache will be localized in. For
@@ -538,7 +465,7 @@ public class TrackerDistributedCacheMana
// Increase the size and sub directory count of the cache
// from baseDirSize and baseDirNumberSubDir.
- addCacheInfoUpdate(cacheStatus);
+ baseDirManager.addCacheInfoUpdate(cacheStatus);
LOG.info(String.format("Cached %s as %s",
cache.toString(), cacheStatus.localizedLoadPath));
@@ -614,28 +541,31 @@ public class TrackerDistributedCacheMana
}
static class CacheStatus {
- // the local load path of this cache
- Path localizedLoadPath;
-
- //the base dir where the cache lies
- Path localizedBaseDir;
-
- //the size of this cache
- long size;
-
- // number of instances using this cache
- int refcount;
-
- // is it initialized ?
- boolean inited = false;
-
+ //
+ // This field should be accessed under global cachedArchives lock.
+ //
+ int refcount; // number of instances using this cache
+
+ //
+ // The following two fields should be accessed under
+ // individual cacheStatus lock.
+ //
+ long size; //the size of this cache.
+ boolean inited = false; // is it initialized ?
+
+ //
+ // The following five fields are Immutable.
+ //
+
// The sub directory (tasktracker/archive or tasktracker/user/archive),
// under which the file will be localized
Path subDir;
-
// unique string used in the construction of local load path
String uniqueString;
-
+ // the local load path of this cache
+ Path localizedLoadPath;
+ //the base dir where the cache lies
+ Path localizedBaseDir;
// The user that owns the cache entry or null if it is public
final String user;
@@ -940,49 +870,175 @@ public class TrackerDistributedCacheMana
return path;
}
+
+ /**
+ * A thread to check and cleanup the unused files periodically
+ */
+ private class CleanupThread extends Thread {
+ // How often do we check if we need to clean up cache files?
+ private long cleanUpCheckPeriod = 60000L; // 1 minute
+ public CleanupThread(Configuration conf) {
+ cleanUpCheckPeriod =
+ conf.getLong("mapreduce.tasktracker.distributedcache.checkperiod",
+ cleanUpCheckPeriod);
+ }
+ private volatile boolean running = true;
+
+ public void stopRunning() {
+ running = false;
+ }
+
+ @Override
+ public void run() {
+ while (running) {
+ try {
+ Thread.sleep(cleanUpCheckPeriod);
+ baseDirManager.checkAndCleanup();
+ } catch (Exception e) {
+ LOG.error("Exception in DistributedCache CleanupThread.", e);
+ // This thread should keep running and never crash.
+ }
+ }
+ }
+ }
+
+ /**
+ * This class holds properties of each base directories and is responsible
+ * for clean up unused cache files in base directories.
+ */
+ private class BaseDirManager {
+
+ // For holding the properties of each cache directory
+ private class CacheDir {
+ long size;
+ long subdirs;
+ }
+
+ private TreeMap<Path, BaseDirManager.CacheDir> properties =
+ new TreeMap<Path, BaseDirManager.CacheDir>();
+
+ private long getDirSize(Path p) {
+ return properties.get(p).size;
+ }
+ private long getDirSubdirs(Path p) {
+ return properties.get(p).subdirs;
+ }
+
+ void checkAndCleanup() throws IOException {
+ Collection<CacheStatus> toBeDeletedCache = new LinkedList<CacheStatus>();
+ Set<Path> toBeCleanedBaseDir = new HashSet<Path>();
+ synchronized (properties) {
+ for (Path baseDir : properties.keySet()) {
+ if (allowedCacheSize < getDirSize(baseDir) ||
+ allowedCacheSubdirs < getDirSubdirs(baseDir)) {
+ toBeCleanedBaseDir.add(baseDir);
+ }
+ }
+ }
+ // try deleting cache Status with refcount of zero
+ synchronized (cachedArchives) {
+ for (Iterator<String> it = cachedArchives.keySet().iterator();
+ it.hasNext();) {
+ String cacheId = it.next();
+ CacheStatus cacheStatus = cachedArchives.get(cacheId);
+ if (toBeCleanedBaseDir.contains(cacheStatus.getBaseDir())) {
+ synchronized (cacheStatus) {
+ // if reference count is zero mark the cache for deletion
+ if (cacheStatus.refcount == 0) {
+ // delete this cache entry from the global list
+ // and mark the localized file for deletion
+ toBeDeletedCache.add(cacheStatus);
+ it.remove();
+ }
+ }
+ }
+ }
+ }
+
+ // do the deletion, after releasing the global lock
+ for (CacheStatus cacheStatus : toBeDeletedCache) {
+ synchronized (cacheStatus) {
+ Path localizedDir = cacheStatus.getLocalizedUniqueDir();
+ if (cacheStatus.user == null) {
+ TrackerDistributedCacheManager.LOG.info("Deleted path " +
localizedDir);
+ try {
+ localFs.delete(localizedDir, true);
+ } catch (IOException e) {
+ TrackerDistributedCacheManager.LOG.warn("Could not delete
distributed cache empty directory "
+ + localizedDir, e);
+ }
+ } else {
+ TrackerDistributedCacheManager.LOG.info("Deleted path " +
localizedDir + " as " + cacheStatus.user);
+ String base = cacheStatus.getBaseDir().toString();
+ String userDir = TaskTracker.getUserDir(cacheStatus.user);
+ int skip = base.length() + 1 + userDir.length() + 1;
+ String relative = localizedDir.toString().substring(skip);
+ taskController.deleteAsUser(cacheStatus.user, relative);
+ }
+ deleteCacheInfoUpdate(cacheStatus);
+ }
+ }
+ }
+
/**
* Decrement the size and sub directory count of the cache from baseDirSize
* and baseDirNumberSubDir. Have to lock lcacheStatus before calling this.
* @param cacheStatus cache status of the cache is deleted
*/
- private void deleteCacheInfoUpdate(CacheStatus cacheStatus) {
+ public void deleteCacheInfoUpdate(CacheStatus cacheStatus) {
if (!cacheStatus.inited) {
// if it is not created yet, do nothing.
return;
}
// decrement the size of the cache from baseDirSize
- synchronized (baseDirProperties) {
- CacheDir cacheDir = baseDirProperties.get(cacheStatus.getBaseDir());
+ synchronized (baseDirManager.properties) {
+ BaseDirManager.CacheDir cacheDir =
properties.get(cacheStatus.getBaseDir());
if (cacheDir != null) {
cacheDir.size -= cacheStatus.size;
cacheDir.subdirs--;
} else {
LOG.warn("Cannot find size and number of subdirectories of" +
- " baseDir: " + cacheStatus.getBaseDir());
+ " baseDir: " + cacheStatus.getBaseDir());
}
}
}
-
+
/**
* Update the maps baseDirSize and baseDirNumberSubDir when adding cache.
* Increase the size and sub directory count of the cache from baseDirSize
* and baseDirNumberSubDir. Have to lock lcacheStatus before calling this.
* @param cacheStatus cache status of the cache is added
*/
- private void addCacheInfoUpdate(CacheStatus cacheStatus) {
+ public void addCacheInfoUpdate(CacheStatus cacheStatus) {
long cacheSize = cacheStatus.size;
// decrement the size of the cache from baseDirSize
- synchronized (baseDirProperties) {
- CacheDir cacheDir = baseDirProperties.get(cacheStatus.getBaseDir());
+ synchronized (baseDirManager.properties) {
+ BaseDirManager.CacheDir cacheDir =
properties.get(cacheStatus.getBaseDir());
if (cacheDir != null) {
cacheDir.size += cacheSize;
cacheDir.subdirs++;
} else {
- cacheDir = new CacheDir();
+ cacheDir = new BaseDirManager.CacheDir();
cacheDir.size = cacheSize;
cacheDir.subdirs = 1;
- baseDirProperties.put(cacheStatus.getBaseDir(), cacheDir);
+ properties.put(cacheStatus.getBaseDir(), cacheDir);
}
}
}
+ }
+
+ /**
+ * Start the background thread
+ */
+ public void startCleanupThread() {
+ this.cleanupThread.start();
+ }
+
+ /**
+ * Stop the background thread
+ */
+ public void stopCleanupThread() {
+ cleanupThread.stopRunning();
+ cleanupThread.interrupt();
+ }
}
Modified:
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1103940&r1=1103939&r2=1103940&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Mon May 16 22:53:09 2011
@@ -812,7 +812,8 @@ public class TaskTracker implements MRCo
// Initialize DistributedCache
this.distributedCacheManager = new TrackerDistributedCacheManager(
this.fConf, taskController);
-
+ this.distributedCacheManager.startCleanupThread();
+
this.jobClient = (InterTrackerProtocol)
UserGroupInformation.getLoginUser().doAs(
new PrivilegedExceptionAction<Object>() {
@@ -1365,6 +1366,7 @@ public class TaskTracker implements MRCo
this.mapLauncher.interrupt();
this.reduceLauncher.interrupt();
+ this.distributedCacheManager.stopCleanupThread();
jvmManager.stop();
// shutdown RPC connections
Modified:
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1103940&r1=1103939&r2=1103940&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
(original)
+++
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
Mon May 16 22:53:09 2011
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Random;
+import java.util.concurrent.TimeUnit;
import javax.security.auth.login.LoginException;
@@ -564,10 +565,13 @@ public class TestTrackerDistributedCache
Configuration conf2 = new Configuration(conf);
conf2.set("mapred.local.dir", ROOT_MAPRED_LOCAL_DIR.toString());
conf2.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
+ conf2.setLong("mapreduce.tasktracker.distributedcache.checkperiod", 200);
// 200 ms
refreshConf(conf2);
TrackerDistributedCacheManager manager =
new TrackerDistributedCacheManager(conf2, taskController);
+ manager.startCleanupThread();
+ try {
FileSystem localfs = FileSystem.getLocal(conf2);
long now = System.currentTimeMillis();
String userName = getJobOwnerName();
@@ -601,9 +605,9 @@ public class TestTrackerDistributedCache
fs.getFileStatus(secondCacheFilePublic), false,
fs.getFileStatus(secondCacheFilePublic).getModificationTime(), true,
cfile2);
- assertFalse("DistributedCache failed deleting old" +
- " cache when the cache store is full.",
- localfs.exists(firstLocalCache));
+ checkCacheDeletion(localfs, firstLocalCache,
+ "DistributedCache failed deleting old" +
+ " cache when the cache store is full");
// find the root directory of distributed caches
Path firstCursor = firstLocalCache;
@@ -633,8 +637,12 @@ public class TestTrackerDistributedCache
conf2.setLong("local.cache.size", LOCAL_CACHE_LIMIT * 10);
conf2.setLong("mapreduce.tasktracker.local.cache.numberdirectories",
LOCAL_CACHE_SUBDIR_LIMIT);
+ manager.stopCleanupThread();
+
manager =
new TrackerDistributedCacheManager(conf2, taskController);
+ manager.startCleanupThread();
+
// Now we test the number of sub directories limit
// Create the temporary cache files to be used in the tests.
Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
@@ -678,9 +686,9 @@ public class TestTrackerDistributedCache
TaskTracker.getPrivateDistributedCacheDir(userName),
fs.getFileStatus(fourthCacheFile), false,
fs.getFileStatus(fourthCacheFile).getModificationTime(), false,
cfile4);
- assertFalse("DistributedCache failed deleting old" +
- " cache when the cache exceeds the number of sub directories limit.",
- localfs.exists(thirdLocalCache));
+ checkCacheDeletion(localfs, thirdLocalCache,
+ "DistributedCache failed deleting old" +
+ " cache when the cache exceeds the number of sub directories limit.");
assertFalse
("DistributedCache did not delete the gensym'ed distcache "
@@ -691,8 +699,30 @@ public class TestTrackerDistributedCache
// Clean up the files created in this test
new File(thirdCacheFile.toString()).delete();
new File(fourthCacheFile.toString()).delete();
+ } finally {
+ manager.stopCleanupThread();
+ }
}
+ /**
+ * Periodically checks if a file is there, return if the file is no longer
+ * there. Fails the test if a files is there for 30 seconds.
+ */
+ private void checkCacheDeletion(FileSystem fs, Path cache, String msg)
+ throws Exception {
+ // Check every 100ms to see if the cache is deleted
+ boolean cacheExists = true;
+ for (int i = 0; i < 300; i++) {
+ if (!fs.exists(cache)) {
+ cacheExists = false;
+ break;
+ }
+ TimeUnit.MILLISECONDS.sleep(100L);
+ }
+ // If the cache is still there after 5 minutes, test fails.
+ assertFalse(msg, cacheExists);
+ }
+
public void testFileSystemOtherThanDefault() throws Exception {
if (!canRun()) {
return;