Author: omalley
Date: Fri Mar 4 03:34:35 2011
New Revision: 1077036
URL: http://svn.apache.org/viewvc?rev=1077036&view=rev
Log:
commit eec554140a1d7fcd0efe58007e8e228e70eef645
Author: Hemanth Yamijala <[email protected]>
Date: Sun Oct 25 16:31:08 2009 +0530
MAPREDUCE:1098 from
http://issues.apache.org/jira/secure/attachment/12423137/patch-1098-ydist.txt
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1098. Fixed the distributed-cache to not do i/o while
+ holding a global lock. (Amareshwari Sriramadasu via acmurthy)
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/filecache/DistributedCache.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestDistributedCache.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/filecache/DistributedCache.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/filecache/DistributedCache.java?rev=1077036&r1=1077035&r2=1077036&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/filecache/DistributedCache.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/filecache/DistributedCache.java
Fri Mar 4 03:34:35 2011
@@ -124,6 +124,8 @@ public class DistributedCache {
private static final Log LOG =
LogFactory.getLog(DistributedCache.class);
+ private static Random random = new Random();
+
/**
* Get the locally cached file or archive; it could either be
* previously cached (and valid) or copy it from the {@link FileSystem} now.
@@ -155,7 +157,8 @@ public class DistributedCache {
Path currentWorkDir)
throws IOException {
return getLocalCache(cache, conf, baseDir, fileStatus, isArchive,
- confFileStamp, currentWorkDir, true);
+ confFileStamp, currentWorkDir, true,
+ new LocalDirAllocator("mapred.local.dir"));
}
/**
* Get the locally cached file or archive; it could either be
@@ -166,7 +169,7 @@ public class DistributedCache {
* or hostname:port is provided the file is assumed to be in the filesystem
* being used in the Configuration
* @param conf The Confguration file which contains the filesystem
- * @param baseDir The base cache Dir where you wnat to localize the
files/archives
+ * @param subDir The sub cache Dir where you want to localize the
files/archives
* @param fileStatus The file status on the dfs.
* @param isArchive if the cache is an archive or a file. In case it is an
* archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
@@ -181,39 +184,59 @@ public class DistributedCache {
* @param honorSymLinkConf if this is false, then the symlinks are not
* created even if conf says so (this is required for an optimization in task
* launches
+ * @param lDirAllocator LocalDirAllocator of the tracker
* @return the path to directory where the archives are unjarred in case of
archives,
* the path to the file where the file is copied locally
* @throws IOException
*/
public static Path getLocalCache(URI cache, Configuration conf,
- Path baseDir, FileStatus fileStatus,
+ Path subDir, FileStatus fileStatus,
boolean isArchive, long confFileStamp,
- Path currentWorkDir, boolean honorSymLinkConf)
+ Path currentWorkDir, boolean honorSymLinkConf,
+ LocalDirAllocator lDirAllocator)
throws IOException {
- String cacheId = makeRelative(cache, conf);
+ String key = getKey(cache, conf, confFileStamp);
CacheStatus lcacheStatus;
Path localizedPath;
synchronized (cachedArchives) {
- lcacheStatus = cachedArchives.get(cacheId);
+ lcacheStatus = cachedArchives.get(key);
if (lcacheStatus == null) {
// was never localized
- lcacheStatus = new CacheStatus(baseDir, new Path(baseDir, new
Path(cacheId)));
- cachedArchives.put(cacheId, lcacheStatus);
+ String cachePath = new Path (subDir,
+ new Path(String.valueOf(random.nextLong()),
+ makeRelative(cache, conf))).toString();
+ Path localPath = lDirAllocator.getLocalPathForWrite(cachePath,
+ fileStatus.getLen(), conf);
+ lcacheStatus = new CacheStatus(
+ new Path(localPath.toString().replace(cachePath, "")), localPath);
+ cachedArchives.put(key, lcacheStatus);
}
-
- synchronized (lcacheStatus) {
- localizedPath = localizeCache(conf, cache, confFileStamp,
lcacheStatus,
- fileStatus, isArchive, currentWorkDir, honorSymLinkConf);
- lcacheStatus.refcount++;
+ lcacheStatus.refcount++;
+ }
+
+ synchronized (lcacheStatus) {
+ if (!lcacheStatus.isInited()) {
+ localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus,
+ fileStatus, isArchive);
+ lcacheStatus.initComplete();
+ } else {
+ localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
+ lcacheStatus, fileStatus, isArchive);
}
+ createSymlink(conf, cache, lcacheStatus, isArchive,
+ currentWorkDir, honorSymLinkConf);
}
-
+
// try deleting stuff if you can
long size = 0;
- synchronized (baseDirSize) {
- Long get = baseDirSize.get(baseDir);
- if ( get != null ) {
- size = get.longValue();
+ synchronized (lcacheStatus) {
+ synchronized (baseDirSize) {
+ Long get = baseDirSize.get(lcacheStatus.getBaseDir());
+ if ( get != null ) {
+ size = get.longValue();
+ } else {
+ LOG.warn("Cannot find size of baseDir: " +
lcacheStatus.getBaseDir());
+ }
}
}
// setting the cache size to a default of 10GB
@@ -268,39 +291,52 @@ public class DistributedCache {
* is contained in.
* @throws IOException
*/
- public static void releaseCache(URI cache, Configuration conf)
+ public static void releaseCache(URI cache, Configuration conf, long
timeStamp)
throws IOException {
- String cacheId = makeRelative(cache, conf);
+ String cacheId = getKey(cache, conf, timeStamp);
synchronized (cachedArchives) {
CacheStatus lcacheStatus = cachedArchives.get(cacheId);
- if (lcacheStatus == null)
+ if (lcacheStatus == null) {
+ LOG.warn("Cannot find localized cache: " + cache +
+ " (key: " + cacheId + ") in releaseCache!");
return;
- synchronized (lcacheStatus) {
- lcacheStatus.refcount--;
}
+ lcacheStatus.refcount--;
}
}
// To delete the caches which have a refcount of zero
private static void deleteCache(Configuration conf) throws IOException {
+ Set<CacheStatus> deleteSet = new HashSet<CacheStatus>();
// try deleting cache Status with refcount of zero
synchronized (cachedArchives) {
for (Iterator it = cachedArchives.keySet().iterator(); it.hasNext();) {
String cacheId = (String) it.next();
CacheStatus lcacheStatus = cachedArchives.get(cacheId);
- synchronized (lcacheStatus) {
- if (lcacheStatus.refcount == 0) {
- // delete this cache entry
- FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
- synchronized (baseDirSize) {
- Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
- if ( dirSize != null ) {
- dirSize -= lcacheStatus.size;
- baseDirSize.put(lcacheStatus.baseDir, dirSize);
- }
- }
- it.remove();
+ if (lcacheStatus.refcount == 0) {
+ // delete this cache entry from the global list
+ // and mark the localized file for deletion
+ deleteSet.add(lcacheStatus);
+ it.remove();
+ }
+ }
+ }
+
+ // do the deletion, after releasing the global lock
+ for (CacheStatus lcacheStatus : deleteSet) {
+ synchronized (lcacheStatus) {
+ FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
+ LOG.info("Deleted path " + lcacheStatus.localLoadPath);
+ // decrement the size of the cache from baseDirSize
+ synchronized (baseDirSize) {
+ Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
+ if ( dirSize != null ) {
+ dirSize -= lcacheStatus.size;
+ baseDirSize.put(lcacheStatus.baseDir, dirSize);
+ } else {
+ LOG.warn("Cannot find record of the baseDir: " +
+ lcacheStatus.baseDir + " during delete!");
}
}
}
@@ -333,128 +369,108 @@ public class DistributedCache {
return path;
}
- private static Path cacheFilePath(Path p) {
- return new Path(p, p.getName());
+ static String getKey(URI cache, Configuration conf, long timeStamp)
+ throws IOException {
+ return makeRelative(cache, conf) + String.valueOf(timeStamp);
+ }
+
+ private static Path checkCacheStatusValidity(Configuration conf,
+ URI cache, long confFileStamp,
+ CacheStatus cacheStatus,
+ FileStatus fileStatus,
+ boolean isArchive
+ ) throws IOException {
+ FileSystem fs = FileSystem.get(cache, conf);
+ // Has to be
+ if (!ifExistsAndFresh(conf, fs, cache, confFileStamp,
+ cacheStatus, fileStatus)) {
+ throw new IOException("Stale cache file: " + cacheStatus.localLoadPath +
+ " for cache-file: " + cache);
+ }
+ LOG.info(String.format("Using existing cache of %s->%s",
+ cache.toString(), cacheStatus.localLoadPath));
+ return cacheStatus.localLoadPath;
+ }
+
+ private static void createSymlink(Configuration conf, URI cache,
+ CacheStatus cacheStatus, boolean isArchive,
+ Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
+ boolean doSymlink = honorSymLinkConf && DistributedCache.getSymlink(conf);
+ if(cache.getFragment() == null) {
+ doSymlink = false;
+ }
+ String link =
+ currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
+ File flink = new File(link);
+ if (doSymlink){
+ if (!flink.exists()) {
+ FileUtil.symLink(cacheStatus.localLoadPath.toString(), link);
+ }
+ }
}
-
+
// the method which actually copies the caches locally and unjars/unzips them
// and does chmod for the files
private static Path localizeCache(Configuration conf,
URI cache, long confFileStamp,
CacheStatus cacheStatus,
FileStatus fileStatus,
- boolean isArchive,
- Path currentWorkDir,boolean
honorSymLinkConf)
+ boolean isArchive)
throws IOException {
- boolean doSymlink = honorSymLinkConf && getSymlink(conf);
- if(cache.getFragment() == null) {
- doSymlink = false;
- }
FileSystem fs = getFileSystem(cache, conf);
- String link = currentWorkDir.toString() + Path.SEPARATOR +
cache.getFragment();
- File flink = new File(link);
- if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
- cacheStatus, fileStatus)) {
- if (isArchive) {
- if (doSymlink){
- if (!flink.exists())
- FileUtil.symLink(cacheStatus.localLoadPath.toString(),
- link);
- }
- return cacheStatus.localLoadPath;
- }
- else {
- if (doSymlink){
- if (!flink.exists())
-
FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(),
- link);
- }
- return cacheFilePath(cacheStatus.localLoadPath);
- }
+ FileSystem localFs = FileSystem.getLocal(conf);
+ Path parchive = null;
+ if (isArchive) {
+ parchive = new Path(cacheStatus.localLoadPath,
+ new Path(cacheStatus.localLoadPath.getName()));
} else {
- // remove the old archive
- // if the old archive cannot be removed since it is being used by another
- // job
- // return null
- if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))
- throw new IOException("Cache " + cacheStatus.localLoadPath.toString()
- + " is in use and cannot be refreshed");
-
- FileSystem localFs = FileSystem.getLocal(conf);
- localFs.delete(cacheStatus.localLoadPath, true);
- synchronized (baseDirSize) {
- Long dirSize = baseDirSize.get(cacheStatus.baseDir);
- if ( dirSize != null ) {
- dirSize -= cacheStatus.size;
- baseDirSize.put(cacheStatus.baseDir, dirSize);
- }
- }
- Path parchive = new Path(cacheStatus.localLoadPath,
- new Path(cacheStatus.localLoadPath.getName()));
-
- if (!localFs.mkdirs(cacheStatus.localLoadPath)) {
- throw new IOException("Mkdirs failed to create directory " +
- cacheStatus.localLoadPath.toString());
- }
-
- String cacheId = cache.getPath();
- fs.copyToLocalFile(new Path(cacheId), parchive);
- if (isArchive) {
- String tmpArchive = parchive.toString().toLowerCase();
- File srcFile = new File(parchive.toString());
- File destDir = new File(parchive.getParent().toString());
- if (tmpArchive.endsWith(".jar")) {
- RunJar.unJar(srcFile, destDir);
- } else if (tmpArchive.endsWith(".zip")) {
- FileUtil.unZip(srcFile, destDir);
- } else if (isTarFile(tmpArchive)) {
- FileUtil.unTar(srcFile, destDir);
- }
- // else will not do anyhting
- // and copy the file into the dir as it is
+ parchive = cacheStatus.localLoadPath;
+ }
+
+ if (!localFs.mkdirs(parchive.getParent())) {
+ throw new IOException("Mkdirs failed to create directory " +
+ cacheStatus.localLoadPath.toString());
+ }
+ String cacheId = cache.getPath();
+ fs.copyToLocalFile(new Path(cacheId), parchive);
+ if (isArchive) {
+ String tmpArchive = parchive.toString().toLowerCase();
+ File srcFile = new File(parchive.toString());
+ File destDir = new File(parchive.getParent().toString());
+ if (tmpArchive.endsWith(".jar")) {
+ RunJar.unJar(srcFile, destDir);
+ } else if (tmpArchive.endsWith(".zip")) {
+ FileUtil.unZip(srcFile, destDir);
+ } else if (isTarFile(tmpArchive)) {
+ FileUtil.unTar(srcFile, destDir);
}
+ // else will not do anyhting
+ // and copy the file into the dir as it is
+ }
- long cacheSize = FileUtil.getDU(new
File(parchive.getParent().toString()));
- cacheStatus.size = cacheSize;
- synchronized (baseDirSize) {
- Long dirSize = baseDirSize.get(cacheStatus.baseDir);
- if( dirSize == null ) {
- dirSize = Long.valueOf(cacheSize);
- } else {
- dirSize += cacheSize;
- }
- baseDirSize.put(cacheStatus.baseDir, dirSize);
- }
+ long cacheSize = FileUtil.getDU(new File(parchive.getParent().toString()));
+ cacheStatus.size = cacheSize;
+ synchronized (baseDirSize) {
+ Long dirSize = baseDirSize.get(cacheStatus.baseDir);
+ if (dirSize == null) {
+ dirSize = Long.valueOf(cacheSize);
+ } else {
+ dirSize += cacheSize;
+ }
+ baseDirSize.put(cacheStatus.baseDir, dirSize);
+ }
- // do chmod here
- try {
- //Setting recursive permission to grant everyone read and execute
- FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
- } catch(InterruptedException e) {
+ // do chmod here
+ try {
+ //Setting recursive permission to grant everyone read and execute
+ FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
+ } catch(InterruptedException e) {
LOG.warn("Exception in chmod" + e.toString());
- }
-
- // update cacheStatus to reflect the newly cached file
- cacheStatus.currentStatus = true;
- cacheStatus.mtime = getTimestamp(conf, cache);
- }
-
- if (isArchive){
- if (doSymlink){
- if (!flink.exists())
- FileUtil.symLink(cacheStatus.localLoadPath.toString(),
- link);
- }
- return cacheStatus.localLoadPath;
- }
- else {
- if (doSymlink){
- if (!flink.exists())
-
FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(),
- link);
- }
- return cacheFilePath(cacheStatus.localLoadPath);
}
+
+ // update cacheStatus to reflect the newly cached file
+ cacheStatus.mtime = getTimestamp(conf, cache);
+ return cacheStatus.localLoadPath;
}
private static boolean isTarFile(String filename) {
@@ -469,27 +485,22 @@ public class DistributedCache {
FileStatus fileStatus)
throws IOException {
// check for existence of the cache
- if (lcacheStatus.currentStatus == false) {
- return false;
+ long dfsFileStamp;
+ if (fileStatus != null) {
+ dfsFileStamp = fileStatus.getModificationTime();
} else {
- long dfsFileStamp;
- if (fileStatus != null) {
- dfsFileStamp = fileStatus.getModificationTime();
- } else {
- dfsFileStamp = getTimestamp(conf, cache);
- }
+ dfsFileStamp = getTimestamp(conf, cache);
+ }
- // ensure that the file on hdfs hasn't been modified since the job
started
- if (dfsFileStamp != confFileStamp) {
- LOG.fatal("File: " + cache + " has changed on HDFS since job started");
- throw new IOException("File: " + cache +
- " has changed on HDFS since job started");
- }
+ // ensure that the file on hdfs hasn't been modified since the job started
+ if (dfsFileStamp != confFileStamp) {
+ LOG.fatal("File: " + cache + " has changed on HDFS since job started");
+ throw new IOException("File: " + cache +
+ " has changed on HDFS since job started");
+ }
- if (dfsFileStamp != lcacheStatus.mtime) {
- // needs refreshing
- return false;
- }
+ if (dfsFileStamp != lcacheStatus.mtime) {
+ return false;
}
return true;
@@ -841,9 +852,6 @@ public class DistributedCache {
}
private static class CacheStatus {
- // false, not loaded yet, true is loaded
- boolean currentStatus;
-
// the local load path of this cache
Path localLoadPath;
@@ -858,16 +866,33 @@ public class DistributedCache {
// the cache-file modification time
long mtime;
+
+ // is it initialized?
+ boolean inited = false;
public CacheStatus(Path baseDir, Path localLoadPath) {
super();
- this.currentStatus = false;
this.localLoadPath = localLoadPath;
this.refcount = 0;
this.mtime = -1;
this.baseDir = baseDir;
this.size = 0;
}
+
+ // get the base dir for the cache
+ Path getBaseDir() {
+ return baseDir;
+ }
+
+ // Is it initialized?
+ boolean isInited() {
+ return inited;
+ }
+
+ // mark it as initalized
+ void initComplete() {
+ inited = true;
+ }
}
/**
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1077036&r1=1077035&r2=1077036&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
Fri Mar 4 03:34:35 2011
@@ -178,21 +178,15 @@ abstract class TaskRunner extends Thread
fileSystem = FileSystem.get(archives[i], conf);
fileStatus = fileSystem.getFileStatus(
new Path(archives[i].getPath()));
- String cacheId = DistributedCache.makeRelative(archives[i],conf);
- String cachePath = TaskTracker.getCacheSubdir() +
- Path.SEPARATOR + cacheId;
-
- localPath = lDirAlloc.getLocalPathForWrite(cachePath,
- fileStatus.getLen(), conf);
- baseDir = localPath.toString().replace(cacheId, "");
p[i] = DistributedCache.getLocalCache(archives[i], conf,
- new Path(baseDir),
+ new Path(TaskTracker.getCacheSubdir()),
fileStatus,
true, Long.parseLong(
archivesTimestamps[i]),
new Path(workDir.
getAbsolutePath()),
- false);
+ false,
+ lDirAlloc);
}
DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
@@ -204,21 +198,15 @@ abstract class TaskRunner extends Thread
fileSystem = FileSystem.get(files[i], conf);
fileStatus = fileSystem.getFileStatus(
new Path(files[i].getPath()));
- String cacheId = DistributedCache.makeRelative(files[i], conf);
- String cachePath = TaskTracker.getCacheSubdir() +
- Path.SEPARATOR + cacheId;
-
- localPath = lDirAlloc.getLocalPathForWrite(cachePath,
- fileStatus.getLen(), conf);
- baseDir = localPath.toString().replace(cacheId, "");
p[i] = DistributedCache.getLocalCache(files[i], conf,
- new Path(baseDir),
+ new Path(TaskTracker.getCacheSubdir()),
fileStatus,
false, Long.parseLong(
fileTimestamps[i]),
new Path(workDir.
getAbsolutePath()),
- false);
+ false,
+ lDirAlloc);
}
DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
}
@@ -537,14 +525,19 @@ abstract class TaskRunner extends Thread
try{
URI[] archives = DistributedCache.getCacheArchives(conf);
URI[] files = DistributedCache.getCacheFiles(conf);
+ String[] archivesTimestamps =
+ DistributedCache.getArchiveTimestamps(conf);
+ String[] fileTimestamps = DistributedCache.getFileTimestamps(conf);
if (archives != null){
for (int i = 0; i < archives.length; i++){
- DistributedCache.releaseCache(archives[i], conf);
+ DistributedCache.releaseCache(archives[i], conf,
+ Long.parseLong(archivesTimestamps[i]));
}
}
if (files != null){
for(int i = 0; i < files.length; i++){
- DistributedCache.releaseCache(files[i], conf);
+ DistributedCache.releaseCache(files[i], conf,
+ Long.parseLong(fileTimestamps[i]));
}
}
}catch(IOException ie){
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestDistributedCache.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestDistributedCache.java?rev=1077036&r1=1077035&r2=1077036&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestDistributedCache.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestDistributedCache.java
Fri Mar 4 03:34:35 2011
@@ -8,6 +8,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import junit.framework.TestCase;
@@ -15,11 +16,10 @@ import junit.framework.TestCase;
public class TestDistributedCache extends TestCase {
static final URI LOCAL_FS = URI.create("file:///");
- private static String TEST_CACHE_BASE_DIR =
- new Path(System.getProperty("test.build.data","/tmp/cachebasedir"))
- .toString().replace(' ', '+');
+ private static String TEST_CACHE_BASE_DIR = "cachebasedir";
private static String TEST_ROOT_DIR =
System.getProperty("test.build.data", "/tmp/distributedcache");
+ private static String MAPRED_LOCAL_DIR = TEST_ROOT_DIR + "/mapred/local";
private static final int TEST_FILE_SIZE = 4 * 1024; // 4K
private static final int LOCAL_CACHE_LIMIT = 5 * 1024; //5K
private Configuration conf;
@@ -34,6 +34,7 @@ public class TestDistributedCache extend
protected void setUp() throws IOException {
conf = new Configuration();
conf.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
+ conf.set("mapred.local.dir", MAPRED_LOCAL_DIR);
localfs = FileSystem.get(LOCAL_FS, conf);
firstCacheFile = new Path(TEST_ROOT_DIR+"/firstcachefile");
secondCacheFile = new Path(TEST_ROOT_DIR+"/secondcachefile");
@@ -43,15 +44,19 @@ public class TestDistributedCache extend
/** test delete cache */
public void testDeleteCache() throws Exception {
- DistributedCache.getLocalCache(firstCacheFile.toUri(), conf, new
Path(TEST_CACHE_BASE_DIR),
- false, System.currentTimeMillis(), new Path(TEST_ROOT_DIR));
- DistributedCache.releaseCache(firstCacheFile.toUri(), conf);
+ long now = System.currentTimeMillis();
+ DistributedCache.getLocalCache(firstCacheFile.toUri(), conf,
+ new Path(TEST_CACHE_BASE_DIR), localfs.getFileStatus(firstCacheFile),
+ false, now, new Path(TEST_ROOT_DIR));
+ DistributedCache.releaseCache(firstCacheFile.toUri(), conf, now);
//in above code,localized a file of size 4K and then release the cache
which will cause the cache
//be deleted when the limit goes out. The below code localize another
cache which's designed to
//sweep away the first cache.
- DistributedCache.getLocalCache(secondCacheFile.toUri(), conf, new
Path(TEST_CACHE_BASE_DIR),
- false, System.currentTimeMillis(), new Path(TEST_ROOT_DIR));
- FileStatus[] dirStatuses = localfs.listStatus(new
Path(TEST_CACHE_BASE_DIR));
+ DistributedCache.getLocalCache(secondCacheFile.toUri(), conf,
+ new Path(TEST_CACHE_BASE_DIR), localfs.getFileStatus(firstCacheFile),
+ false, now, new Path(TEST_ROOT_DIR));
+ FileStatus[] dirStatuses = localfs.listStatus(
+ new Path(MAPRED_LOCAL_DIR, TEST_CACHE_BASE_DIR));
assertTrue("DistributedCache failed deleting old cache when the cache
store is full.",
dirStatuses.length > 1);
}