Author: omalley
Date: Fri Mar 4 03:40:56 2011
New Revision: 1077104
URL: http://svn.apache.org/viewvc?rev=1077104&view=rev
Log:
commit f7997dc8de4e1f2d2e4a3c8dd53fe1071613e6ea
Author: Hemanth Yamijala <[email protected]>
Date: Mon Jan 11 20:43:20 2010 +0530
Reverting patch
https://issues.apache.org/jira/secure/attachment/12423137/patch-1098-ydist.txt
for MAPREDUCE:1098
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/filecache/DistributedCache.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestDistributedCache.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/filecache/DistributedCache.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/filecache/DistributedCache.java?rev=1077104&r1=1077103&r2=1077104&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/filecache/DistributedCache.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/filecache/DistributedCache.java
Fri Mar 4 03:40:56 2011
@@ -124,8 +124,6 @@ public class DistributedCache {
private static final Log LOG =
LogFactory.getLog(DistributedCache.class);
- private static Random random = new Random();
-
/**
* Get the locally cached file or archive; it could either be
* previously cached (and valid) or copy it from the {@link FileSystem} now.
@@ -157,8 +155,7 @@ public class DistributedCache {
Path currentWorkDir)
throws IOException {
return getLocalCache(cache, conf, baseDir, fileStatus, isArchive,
- confFileStamp, currentWorkDir, true,
- new LocalDirAllocator("mapred.local.dir"));
+ confFileStamp, currentWorkDir, true);
}
/**
* Get the locally cached file or archive; it could either be
@@ -169,7 +166,7 @@ public class DistributedCache {
* or hostname:port is provided the file is assumed to be in the filesystem
* being used in the Configuration
* @param conf The Confguration file which contains the filesystem
- * @param subDir The sub cache Dir where you want to localize the
files/archives
+ * @param baseDir The base cache Dir where you wnat to localize the
files/archives
* @param fileStatus The file status on the dfs.
* @param isArchive if the cache is an archive or a file. In case it is an
* archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
@@ -184,59 +181,39 @@ public class DistributedCache {
* @param honorSymLinkConf if this is false, then the symlinks are not
* created even if conf says so (this is required for an optimization in task
* launches
- * @param lDirAllocator LocalDirAllocator of the tracker
* @return the path to directory where the archives are unjarred in case of
archives,
* the path to the file where the file is copied locally
* @throws IOException
*/
public static Path getLocalCache(URI cache, Configuration conf,
- Path subDir, FileStatus fileStatus,
+ Path baseDir, FileStatus fileStatus,
boolean isArchive, long confFileStamp,
- Path currentWorkDir, boolean honorSymLinkConf,
- LocalDirAllocator lDirAllocator)
+ Path currentWorkDir, boolean honorSymLinkConf)
throws IOException {
- String key = getKey(cache, conf, confFileStamp);
+ String cacheId = makeRelative(cache, conf);
CacheStatus lcacheStatus;
Path localizedPath;
synchronized (cachedArchives) {
- lcacheStatus = cachedArchives.get(key);
+ lcacheStatus = cachedArchives.get(cacheId);
if (lcacheStatus == null) {
// was never localized
- String cachePath = new Path (subDir,
- new Path(String.valueOf(random.nextLong()),
- makeRelative(cache, conf))).toString();
- Path localPath = lDirAllocator.getLocalPathForWrite(cachePath,
- fileStatus.getLen(), conf);
- lcacheStatus = new CacheStatus(
- new Path(localPath.toString().replace(cachePath, "")), localPath);
- cachedArchives.put(key, lcacheStatus);
+ lcacheStatus = new CacheStatus(baseDir, new Path(baseDir, new
Path(cacheId)));
+ cachedArchives.put(cacheId, lcacheStatus);
}
- lcacheStatus.refcount++;
- }
-
- synchronized (lcacheStatus) {
- if (!lcacheStatus.isInited()) {
- localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus,
- fileStatus, isArchive);
- lcacheStatus.initComplete();
- } else {
- localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
- lcacheStatus, fileStatus, isArchive);
+
+ synchronized (lcacheStatus) {
+ localizedPath = localizeCache(conf, cache, confFileStamp,
lcacheStatus,
+ fileStatus, isArchive, currentWorkDir, honorSymLinkConf);
+ lcacheStatus.refcount++;
}
- createSymlink(conf, cache, lcacheStatus, isArchive,
- currentWorkDir, honorSymLinkConf);
}
-
+
// try deleting stuff if you can
long size = 0;
- synchronized (lcacheStatus) {
- synchronized (baseDirSize) {
- Long get = baseDirSize.get(lcacheStatus.getBaseDir());
- if ( get != null ) {
- size = get.longValue();
- } else {
- LOG.warn("Cannot find size of baseDir: " +
lcacheStatus.getBaseDir());
- }
+ synchronized (baseDirSize) {
+ Long get = baseDirSize.get(baseDir);
+ if ( get != null ) {
+ size = get.longValue();
}
}
// setting the cache size to a default of 10GB
@@ -291,52 +268,39 @@ public class DistributedCache {
* is contained in.
* @throws IOException
*/
- public static void releaseCache(URI cache, Configuration conf, long
timeStamp)
+ public static void releaseCache(URI cache, Configuration conf)
throws IOException {
- String cacheId = getKey(cache, conf, timeStamp);
+ String cacheId = makeRelative(cache, conf);
synchronized (cachedArchives) {
CacheStatus lcacheStatus = cachedArchives.get(cacheId);
- if (lcacheStatus == null) {
- LOG.warn("Cannot find localized cache: " + cache +
- " (key: " + cacheId + ") in releaseCache!");
+ if (lcacheStatus == null)
return;
+ synchronized (lcacheStatus) {
+ lcacheStatus.refcount--;
}
- lcacheStatus.refcount--;
}
}
// To delete the caches which have a refcount of zero
private static void deleteCache(Configuration conf) throws IOException {
- Set<CacheStatus> deleteSet = new HashSet<CacheStatus>();
// try deleting cache Status with refcount of zero
synchronized (cachedArchives) {
for (Iterator it = cachedArchives.keySet().iterator(); it.hasNext();) {
String cacheId = (String) it.next();
CacheStatus lcacheStatus = cachedArchives.get(cacheId);
- if (lcacheStatus.refcount == 0) {
- // delete this cache entry from the global list
- // and mark the localized file for deletion
- deleteSet.add(lcacheStatus);
- it.remove();
- }
- }
- }
-
- // do the deletion, after releasing the global lock
- for (CacheStatus lcacheStatus : deleteSet) {
- synchronized (lcacheStatus) {
- FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
- LOG.info("Deleted path " + lcacheStatus.localLoadPath);
- // decrement the size of the cache from baseDirSize
- synchronized (baseDirSize) {
- Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
- if ( dirSize != null ) {
- dirSize -= lcacheStatus.size;
- baseDirSize.put(lcacheStatus.baseDir, dirSize);
- } else {
- LOG.warn("Cannot find record of the baseDir: " +
- lcacheStatus.baseDir + " during delete!");
+ synchronized (lcacheStatus) {
+ if (lcacheStatus.refcount == 0) {
+ // delete this cache entry
+ FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
+ synchronized (baseDirSize) {
+ Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
+ if ( dirSize != null ) {
+ dirSize -= lcacheStatus.size;
+ baseDirSize.put(lcacheStatus.baseDir, dirSize);
+ }
+ }
+ it.remove();
}
}
}
@@ -369,108 +333,128 @@ public class DistributedCache {
return path;
}
- static String getKey(URI cache, Configuration conf, long timeStamp)
- throws IOException {
- return makeRelative(cache, conf) + String.valueOf(timeStamp);
- }
-
- private static Path checkCacheStatusValidity(Configuration conf,
- URI cache, long confFileStamp,
- CacheStatus cacheStatus,
- FileStatus fileStatus,
- boolean isArchive
- ) throws IOException {
- FileSystem fs = FileSystem.get(cache, conf);
- // Has to be
- if (!ifExistsAndFresh(conf, fs, cache, confFileStamp,
- cacheStatus, fileStatus)) {
- throw new IOException("Stale cache file: " + cacheStatus.localLoadPath +
- " for cache-file: " + cache);
- }
- LOG.info(String.format("Using existing cache of %s->%s",
- cache.toString(), cacheStatus.localLoadPath));
- return cacheStatus.localLoadPath;
- }
-
- private static void createSymlink(Configuration conf, URI cache,
- CacheStatus cacheStatus, boolean isArchive,
- Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
- boolean doSymlink = honorSymLinkConf && DistributedCache.getSymlink(conf);
- if(cache.getFragment() == null) {
- doSymlink = false;
- }
- String link =
- currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
- File flink = new File(link);
- if (doSymlink){
- if (!flink.exists()) {
- FileUtil.symLink(cacheStatus.localLoadPath.toString(), link);
- }
- }
+ private static Path cacheFilePath(Path p) {
+ return new Path(p, p.getName());
}
-
+
// the method which actually copies the caches locally and unjars/unzips them
// and does chmod for the files
private static Path localizeCache(Configuration conf,
URI cache, long confFileStamp,
CacheStatus cacheStatus,
FileStatus fileStatus,
- boolean isArchive)
+ boolean isArchive,
+ Path currentWorkDir,boolean
honorSymLinkConf)
throws IOException {
- FileSystem fs = getFileSystem(cache, conf);
- FileSystem localFs = FileSystem.getLocal(conf);
- Path parchive = null;
- if (isArchive) {
- parchive = new Path(cacheStatus.localLoadPath,
- new Path(cacheStatus.localLoadPath.getName()));
- } else {
- parchive = cacheStatus.localLoadPath;
+ boolean doSymlink = honorSymLinkConf && getSymlink(conf);
+ if(cache.getFragment() == null) {
+ doSymlink = false;
}
-
- if (!localFs.mkdirs(parchive.getParent())) {
- throw new IOException("Mkdirs failed to create directory " +
- cacheStatus.localLoadPath.toString());
- }
- String cacheId = cache.getPath();
- fs.copyToLocalFile(new Path(cacheId), parchive);
- if (isArchive) {
- String tmpArchive = parchive.toString().toLowerCase();
- File srcFile = new File(parchive.toString());
- File destDir = new File(parchive.getParent().toString());
- if (tmpArchive.endsWith(".jar")) {
- RunJar.unJar(srcFile, destDir);
- } else if (tmpArchive.endsWith(".zip")) {
- FileUtil.unZip(srcFile, destDir);
- } else if (isTarFile(tmpArchive)) {
- FileUtil.unTar(srcFile, destDir);
+ FileSystem fs = getFileSystem(cache, conf);
+ String link = currentWorkDir.toString() + Path.SEPARATOR +
cache.getFragment();
+ File flink = new File(link);
+ if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
+ cacheStatus, fileStatus)) {
+ if (isArchive) {
+ if (doSymlink){
+ if (!flink.exists())
+ FileUtil.symLink(cacheStatus.localLoadPath.toString(),
+ link);
+ }
+ return cacheStatus.localLoadPath;
}
- // else will not do anyhting
- // and copy the file into the dir as it is
- }
+ else {
+ if (doSymlink){
+ if (!flink.exists())
+
FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(),
+ link);
+ }
+ return cacheFilePath(cacheStatus.localLoadPath);
+ }
+ } else {
+ // remove the old archive
+ // if the old archive cannot be removed since it is being used by another
+ // job
+ // return null
+ if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))
+ throw new IOException("Cache " + cacheStatus.localLoadPath.toString()
+ + " is in use and cannot be refreshed");
- long cacheSize = FileUtil.getDU(new File(parchive.getParent().toString()));
- cacheStatus.size = cacheSize;
- synchronized (baseDirSize) {
+ FileSystem localFs = FileSystem.getLocal(conf);
+ localFs.delete(cacheStatus.localLoadPath, true);
+ synchronized (baseDirSize) {
Long dirSize = baseDirSize.get(cacheStatus.baseDir);
- if (dirSize == null) {
- dirSize = Long.valueOf(cacheSize);
- } else {
- dirSize += cacheSize;
- }
- baseDirSize.put(cacheStatus.baseDir, dirSize);
- }
+ if ( dirSize != null ) {
+ dirSize -= cacheStatus.size;
+ baseDirSize.put(cacheStatus.baseDir, dirSize);
+ }
+ }
+ Path parchive = new Path(cacheStatus.localLoadPath,
+ new Path(cacheStatus.localLoadPath.getName()));
+
+ if (!localFs.mkdirs(cacheStatus.localLoadPath)) {
+ throw new IOException("Mkdirs failed to create directory " +
+ cacheStatus.localLoadPath.toString());
+ }
+
+ String cacheId = cache.getPath();
+ fs.copyToLocalFile(new Path(cacheId), parchive);
+ if (isArchive) {
+ String tmpArchive = parchive.toString().toLowerCase();
+ File srcFile = new File(parchive.toString());
+ File destDir = new File(parchive.getParent().toString());
+ if (tmpArchive.endsWith(".jar")) {
+ RunJar.unJar(srcFile, destDir);
+ } else if (tmpArchive.endsWith(".zip")) {
+ FileUtil.unZip(srcFile, destDir);
+ } else if (isTarFile(tmpArchive)) {
+ FileUtil.unTar(srcFile, destDir);
+ }
+ // else will not do anyhting
+ // and copy the file into the dir as it is
+ }
- // do chmod here
- try {
- //Setting recursive permission to grant everyone read and execute
- FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
- } catch(InterruptedException e) {
+ long cacheSize = FileUtil.getDU(new
File(parchive.getParent().toString()));
+ cacheStatus.size = cacheSize;
+ synchronized (baseDirSize) {
+ Long dirSize = baseDirSize.get(cacheStatus.baseDir);
+ if( dirSize == null ) {
+ dirSize = Long.valueOf(cacheSize);
+ } else {
+ dirSize += cacheSize;
+ }
+ baseDirSize.put(cacheStatus.baseDir, dirSize);
+ }
+
+ // do chmod here
+ try {
+ //Setting recursive permission to grant everyone read and execute
+ FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
+ } catch(InterruptedException e) {
LOG.warn("Exception in chmod" + e.toString());
- }
+ }
- // update cacheStatus to reflect the newly cached file
- cacheStatus.mtime = getTimestamp(conf, cache);
- return cacheStatus.localLoadPath;
+ // update cacheStatus to reflect the newly cached file
+ cacheStatus.currentStatus = true;
+ cacheStatus.mtime = getTimestamp(conf, cache);
+ }
+
+ if (isArchive){
+ if (doSymlink){
+ if (!flink.exists())
+ FileUtil.symLink(cacheStatus.localLoadPath.toString(),
+ link);
+ }
+ return cacheStatus.localLoadPath;
+ }
+ else {
+ if (doSymlink){
+ if (!flink.exists())
+
FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(),
+ link);
+ }
+ return cacheFilePath(cacheStatus.localLoadPath);
+ }
}
private static boolean isTarFile(String filename) {
@@ -485,22 +469,27 @@ public class DistributedCache {
FileStatus fileStatus)
throws IOException {
// check for existence of the cache
- long dfsFileStamp;
- if (fileStatus != null) {
- dfsFileStamp = fileStatus.getModificationTime();
+ if (lcacheStatus.currentStatus == false) {
+ return false;
} else {
- dfsFileStamp = getTimestamp(conf, cache);
- }
+ long dfsFileStamp;
+ if (fileStatus != null) {
+ dfsFileStamp = fileStatus.getModificationTime();
+ } else {
+ dfsFileStamp = getTimestamp(conf, cache);
+ }
- // ensure that the file on hdfs hasn't been modified since the job started
- if (dfsFileStamp != confFileStamp) {
- LOG.fatal("File: " + cache + " has changed on HDFS since job started");
- throw new IOException("File: " + cache +
- " has changed on HDFS since job started");
- }
+ // ensure that the file on hdfs hasn't been modified since the job
started
+ if (dfsFileStamp != confFileStamp) {
+ LOG.fatal("File: " + cache + " has changed on HDFS since job started");
+ throw new IOException("File: " + cache +
+ " has changed on HDFS since job started");
+ }
- if (dfsFileStamp != lcacheStatus.mtime) {
- return false;
+ if (dfsFileStamp != lcacheStatus.mtime) {
+ // needs refreshing
+ return false;
+ }
}
return true;
@@ -852,6 +841,9 @@ public class DistributedCache {
}
private static class CacheStatus {
+ // false, not loaded yet, true is loaded
+ boolean currentStatus;
+
// the local load path of this cache
Path localLoadPath;
@@ -866,33 +858,16 @@ public class DistributedCache {
// the cache-file modification time
long mtime;
-
- // is it initialized?
- boolean inited = false;
public CacheStatus(Path baseDir, Path localLoadPath) {
super();
+ this.currentStatus = false;
this.localLoadPath = localLoadPath;
this.refcount = 0;
this.mtime = -1;
this.baseDir = baseDir;
this.size = 0;
}
-
- // get the base dir for the cache
- Path getBaseDir() {
- return baseDir;
- }
-
- // Is it initialized?
- boolean isInited() {
- return inited;
- }
-
- // mark it as initalized
- void initComplete() {
- inited = true;
- }
}
/**
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1077104&r1=1077103&r2=1077104&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
Fri Mar 4 03:40:56 2011
@@ -179,15 +179,21 @@ abstract class TaskRunner extends Thread
fileSystem = FileSystem.get(archives[i], conf);
fileStatus = fileSystem.getFileStatus(
new Path(archives[i].getPath()));
+ String cacheId = DistributedCache.makeRelative(archives[i],conf);
+ String cachePath = TaskTracker.getCacheSubdir() +
+ Path.SEPARATOR + cacheId;
+
+ localPath = lDirAlloc.getLocalPathForWrite(cachePath,
+ fileStatus.getLen(), conf);
+ baseDir = localPath.toString().replace(cacheId, "");
p[i] = DistributedCache.getLocalCache(archives[i], conf,
- new Path(TaskTracker.getCacheSubdir()),
+ new Path(baseDir),
fileStatus,
true, Long.parseLong(
archivesTimestamps[i]),
new Path(workDir.
getAbsolutePath()),
- false,
- lDirAlloc);
+ false);
}
DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
@@ -199,15 +205,21 @@ abstract class TaskRunner extends Thread
fileSystem = FileSystem.get(files[i], conf);
fileStatus = fileSystem.getFileStatus(
new Path(files[i].getPath()));
+ String cacheId = DistributedCache.makeRelative(files[i], conf);
+ String cachePath = TaskTracker.getCacheSubdir() +
+ Path.SEPARATOR + cacheId;
+
+ localPath = lDirAlloc.getLocalPathForWrite(cachePath,
+ fileStatus.getLen(), conf);
+ baseDir = localPath.toString().replace(cacheId, "");
p[i] = DistributedCache.getLocalCache(files[i], conf,
- new Path(TaskTracker.getCacheSubdir()),
+ new Path(baseDir),
fileStatus,
false, Long.parseLong(
fileTimestamps[i]),
new Path(workDir.
getAbsolutePath()),
- false,
- lDirAlloc);
+ false);
}
DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
}
@@ -530,19 +542,14 @@ abstract class TaskRunner extends Thread
try{
URI[] archives = DistributedCache.getCacheArchives(conf);
URI[] files = DistributedCache.getCacheFiles(conf);
- String[] archivesTimestamps =
- DistributedCache.getArchiveTimestamps(conf);
- String[] fileTimestamps = DistributedCache.getFileTimestamps(conf);
if (archives != null){
for (int i = 0; i < archives.length; i++){
- DistributedCache.releaseCache(archives[i], conf,
- Long.parseLong(archivesTimestamps[i]));
+ DistributedCache.releaseCache(archives[i], conf);
}
}
if (files != null){
for(int i = 0; i < files.length; i++){
- DistributedCache.releaseCache(files[i], conf,
- Long.parseLong(fileTimestamps[i]));
+ DistributedCache.releaseCache(files[i], conf);
}
}
}catch(IOException ie){
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestDistributedCache.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestDistributedCache.java?rev=1077104&r1=1077103&r2=1077104&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestDistributedCache.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestDistributedCache.java
Fri Mar 4 03:40:56 2011
@@ -8,7 +8,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import junit.framework.TestCase;
@@ -16,10 +15,11 @@ import junit.framework.TestCase;
public class TestDistributedCache extends TestCase {
static final URI LOCAL_FS = URI.create("file:///");
- private static String TEST_CACHE_BASE_DIR = "cachebasedir";
+ private static String TEST_CACHE_BASE_DIR =
+ new Path(System.getProperty("test.build.data","/tmp/cachebasedir"))
+ .toString().replace(' ', '+');
private static String TEST_ROOT_DIR =
System.getProperty("test.build.data", "/tmp/distributedcache");
- private static String MAPRED_LOCAL_DIR = TEST_ROOT_DIR + "/mapred/local";
private static final int TEST_FILE_SIZE = 4 * 1024; // 4K
private static final int LOCAL_CACHE_LIMIT = 5 * 1024; //5K
private Configuration conf;
@@ -34,7 +34,6 @@ public class TestDistributedCache extend
protected void setUp() throws IOException {
conf = new Configuration();
conf.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
- conf.set("mapred.local.dir", MAPRED_LOCAL_DIR);
localfs = FileSystem.get(LOCAL_FS, conf);
firstCacheFile = new Path(TEST_ROOT_DIR+"/firstcachefile");
secondCacheFile = new Path(TEST_ROOT_DIR+"/secondcachefile");
@@ -44,19 +43,15 @@ public class TestDistributedCache extend
/** test delete cache */
public void testDeleteCache() throws Exception {
- long now = System.currentTimeMillis();
- DistributedCache.getLocalCache(firstCacheFile.toUri(), conf,
- new Path(TEST_CACHE_BASE_DIR), localfs.getFileStatus(firstCacheFile),
- false, now, new Path(TEST_ROOT_DIR));
- DistributedCache.releaseCache(firstCacheFile.toUri(), conf, now);
+ DistributedCache.getLocalCache(firstCacheFile.toUri(), conf, new
Path(TEST_CACHE_BASE_DIR),
+ false, System.currentTimeMillis(), new Path(TEST_ROOT_DIR));
+ DistributedCache.releaseCache(firstCacheFile.toUri(), conf);
//in above code,localized a file of size 4K and then release the cache
which will cause the cache
//be deleted when the limit goes out. The below code localize another
cache which's designed to
//sweep away the first cache.
- DistributedCache.getLocalCache(secondCacheFile.toUri(), conf,
- new Path(TEST_CACHE_BASE_DIR), localfs.getFileStatus(firstCacheFile),
- false, now, new Path(TEST_ROOT_DIR));
- FileStatus[] dirStatuses = localfs.listStatus(
- new Path(MAPRED_LOCAL_DIR, TEST_CACHE_BASE_DIR));
+ DistributedCache.getLocalCache(secondCacheFile.toUri(), conf, new
Path(TEST_CACHE_BASE_DIR),
+ false, System.currentTimeMillis(), new Path(TEST_ROOT_DIR));
+ FileStatus[] dirStatuses = localfs.listStatus(new
Path(TEST_CACHE_BASE_DIR));
assertTrue("DistributedCache failed deleting old cache when the cache
store is full.",
dirStatuses.length > 1);
}