Author: tucu
Date: Thu Jan 10 00:55:11 2013
New Revision: 1431168
URL: http://svn.apache.org/viewvc?rev=1431168&view=rev
Log:
MAPREDUCE-4907. TrackerDistributedCacheManager issues too many getFileStatus
calls. (sandyr via tucu)
Modified:
hadoop/common/branches/branch-1/CHANGES.txt
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1431168&r1=1431167&r2=1431168&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Thu Jan 10 00:55:11 2013
@@ -154,6 +154,9 @@ Release 1.2.0 - unreleased
HDFS-4320. Add a separate configuration for namenode rpc address instead
of using fs.default.name. (Mostafa Elhemali via suresh)
+ MAPREDUCE-4907. TrackerDistributedCacheManager issues too many
+ getFileStatus calls. (sandyr via tucu)
+
OPTIMIZATIONS
HDFS-2533. Backport: Remove needless synchronization on some FSDataSet
Modified:
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1431168&r1=1431167&r2=1431168&view=diff
==============================================================================
---
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
(original)
+++
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
Thu Jan 10 00:55:11 2013
@@ -320,14 +320,15 @@ public class TrackerDistributedCacheMana
* @return true if the path in the uri is visible to all, false otherwise
* @throws IOException
*/
- static boolean isPublic(Configuration conf, URI uri) throws IOException {
+ static boolean isPublic(Configuration conf, URI uri,
+ Map<URI, FileStatus> statCache) throws IOException {
FileSystem fs = FileSystem.get(uri, conf);
Path current = new Path(uri.getPath());
//the leaf level file should be readable by others
- if (!checkPermissionOfOther(fs, current, FsAction.READ)) {
+ if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) {
return false;
}
- return ancestorsHaveExecutePermissions(fs, current.getParent());
+ return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache);
}
/**
@@ -335,12 +336,12 @@ public class TrackerDistributedCacheMana
* permission set for all users (i.e. that other users can traverse
* the directory heirarchy to the given path)
*/
- static boolean ancestorsHaveExecutePermissions(FileSystem fs, Path path)
- throws IOException {
+ static boolean ancestorsHaveExecutePermissions(FileSystem fs, Path path,
+ Map<URI, FileStatus> statCache) throws IOException {
Path current = path;
while (current != null) {
//the subdirs in the path should have execute permissions for others
- if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE)) {
+ if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) {
return false;
}
current = current.getParent();
@@ -358,8 +359,8 @@ public class TrackerDistributedCacheMana
* @throws IOException
*/
private static boolean checkPermissionOfOther(FileSystem fs, Path path,
- FsAction action) throws IOException {
- FileStatus status = fs.getFileStatus(path);
+ FsAction action, Map<URI, FileStatus> statCache) throws IOException {
+ FileStatus status = getFileStatus(fs, path, statCache);
FsPermission perms = status.getPermission();
FsAction otherAction = perms.getOtherAction();
if (otherAction.implies(action)) {
@@ -712,25 +713,69 @@ public class TrackerDistributedCacheMana
}
/**
+ * Gets the file status for the given URI. If the URI is in the cache,
+ * returns it. Otherwise, fetches it and adds it to the cache.
+ */
+ private static FileStatus getFileStatus(Configuration job, URI uri,
+ Map<URI, FileStatus> statCache) throws IOException {
+ FileStatus stat = statCache.get(uri);
+ if (stat == null) {
+ stat = DistributedCache.getFileStatus(job, uri);
+ statCache.put(uri, stat);
+ }
+ return stat;
+ }
+
+ private static FileStatus getFileStatus(FileSystem fs, Path path,
+ Map<URI, FileStatus> statCache) throws IOException {
+ URI uri = path.toUri();
+ FileStatus stat = statCache.get(uri);
+ if (stat == null) {
+ stat = fs.getFileStatus(path);
+ statCache.put(uri, stat);
+ }
+ return stat;
+ }
+
+ /**
* Determines timestamps of files to be cached, and stores those
- * in the configuration. This is intended to be used internally by JobClient
- * after all cache files have been added.
+ * in the configuration. Determines the visibilities of the distributed cache
+ * files and archives. The visibility of a cache path is "public" if the leaf
+ * component has READ permissions for others, and the parent subdirs have
+ * EXECUTE permissions for others.
*
* This is an internal method!
*
+ * @param job
+ * @throws IOException
+ */
+ public static void determineTimestampsAndCacheVisibilities(Configuration job)
+ throws IOException {
+ Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>();
+ determineTimestamps(job, statCache);
+ determineCacheVisibilities(job, statCache);
+ }
+
+ /**
+ * Determines timestamps of files to be cached, and stores those
+ * in the configuration.
+ *
* @param job Configuration of a job.
+ * @param statCache a cache of FileStatuses so that redundant remote
+ * calls can be avoided
* @throws IOException
*/
- public static void determineTimestamps(Configuration job) throws IOException
{
+ static void determineTimestamps(Configuration job,
+ Map<URI, FileStatus> statCache) throws IOException {
URI[] tarchives = DistributedCache.getCacheArchives(job);
if (tarchives != null) {
- FileStatus status = DistributedCache.getFileStatus(job, tarchives[0]);
+ FileStatus status = getFileStatus(job, tarchives[0], statCache);
StringBuffer archiveFileSizes =
new StringBuffer(String.valueOf(status.getLen()));
StringBuffer archiveTimestamps =
new StringBuffer(String.valueOf(status.getModificationTime()));
for (int i = 1; i < tarchives.length; i++) {
- status = DistributedCache.getFileStatus(job, tarchives[i]);
+ status = getFileStatus(job, tarchives[i], statCache);
archiveFileSizes.append(",");
archiveFileSizes.append(String.valueOf(status.getLen()));
archiveTimestamps.append(",");
@@ -744,7 +789,7 @@ public class TrackerDistributedCacheMana
URI[] tfiles = DistributedCache.getCacheFiles(job);
if (tfiles != null) {
- FileStatus status = DistributedCache.getFileStatus(job, tfiles[0]);
+ FileStatus status = getFileStatus(job, tfiles[0], statCache);
StringBuffer fileSizes =
new StringBuffer(String.valueOf(status.getLen()));
StringBuffer fileTimestamps = new StringBuffer(String.valueOf(
@@ -766,27 +811,29 @@ public class TrackerDistributedCacheMana
* has READ permissions for others, and the parent subdirs have
* EXECUTE permissions for others
* @param job
+ * @param statCache a cache of FileStatuses so that redundant remote
+ * calls can be avoided
* @throws IOException
*/
- public static void determineCacheVisibilities(Configuration job)
- throws IOException {
+ static void determineCacheVisibilities(Configuration job,
+ Map<URI, FileStatus> statCache) throws IOException {
URI[] tarchives = DistributedCache.getCacheArchives(job);
if (tarchives != null) {
StringBuffer archiveVisibilities =
- new StringBuffer(String.valueOf(isPublic(job, tarchives[0])));
+ new StringBuffer(String.valueOf(isPublic(job, tarchives[0],
statCache)));
for (int i = 1; i < tarchives.length; i++) {
archiveVisibilities.append(",");
- archiveVisibilities.append(String.valueOf(isPublic(job,
tarchives[i])));
+ archiveVisibilities.append(String.valueOf(isPublic(job, tarchives[i],
statCache)));
}
setArchiveVisibilities(job, archiveVisibilities.toString());
}
URI[] tfiles = DistributedCache.getCacheFiles(job);
if (tfiles != null) {
StringBuffer fileVisibilities =
- new StringBuffer(String.valueOf(isPublic(job, tfiles[0])));
+ new StringBuffer(String.valueOf(isPublic(job, tfiles[0], statCache)));
for (int i = 1; i < tfiles.length; i++) {
fileVisibilities.append(",");
- fileVisibilities.append(String.valueOf(isPublic(job, tfiles[i])));
+ fileVisibilities.append(String.valueOf(isPublic(job, tfiles[i],
statCache)));
}
setFileVisibilities(job, fileVisibilities.toString());
}
Modified:
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1431168&r1=1431167&r2=1431168&view=diff
==============================================================================
---
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java
(original)
+++
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java
Thu Jan 10 00:55:11 2013
@@ -827,10 +827,9 @@ public class JobClient extends Configure
// First we check whether the cached archives and files are legal.
TrackerDistributedCacheManager.validate(job);
- // set the timestamps of the archives and files
- TrackerDistributedCacheManager.determineTimestamps(job);
- // set the public/private visibility of the archives and files
- TrackerDistributedCacheManager.determineCacheVisibilities(job);
+ // set the timestamps of the archives and files and set the
+ // public/private visibility of the archives and files
+
TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(job);
// get DelegationTokens for cache files
TrackerDistributedCacheManager.getDelegationTokens(job,
job.getCredentials());
Modified:
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1431168&r1=1431167&r2=1431168&view=diff
==============================================================================
---
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
(original)
+++
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
Thu Jan 10 00:55:11 2013
@@ -24,6 +24,8 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -108,7 +110,7 @@ public class TestTrackerDistributedCache
assertTrue("Test root directory " + TEST_ROOT + " and all of its " +
"parent directories must have a+x permissions",
TrackerDistributedCacheManager.ancestorsHaveExecutePermissions(
- fs, new Path(TEST_ROOT.toString())));
+ fs, new Path(TEST_ROOT.toString()), new HashMap<URI,
FileStatus>()));
// Prepare the tests' mapred-local-dir
ROOT_MAPRED_LOCAL_DIR = new File(TEST_ROOT_DIR, "mapred/local");
@@ -187,8 +189,11 @@ public class TestTrackerDistributedCache
DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
DistributedCache.addFileToClassPath(secondCacheFile, subConf,
FileSystem.get(subConf));
- TrackerDistributedCacheManager.determineTimestamps(subConf);
- TrackerDistributedCacheManager.determineCacheVisibilities(subConf);
+
+ Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>();
+ TrackerDistributedCacheManager.determineTimestamps(subConf, statCache);
+ TrackerDistributedCacheManager.determineCacheVisibilities(subConf,
statCache);
+ assertEquals(2, statCache.size());
// ****** End of imitating JobClient code
Path jobFile = new Path(TEST_ROOT_DIR, "job.xml");
@@ -273,8 +278,7 @@ public class TestTrackerDistributedCache
conf1.set("user.name", userName);
DistributedCache.addCacheFile(secondCacheFile.toUri(), conf1);
- TrackerDistributedCacheManager.determineTimestamps(conf1);
- TrackerDistributedCacheManager.determineCacheVisibilities(conf1);
+
TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf1);
// Task localizing for first job
JobID jobId = new JobID("jt", 1);
@@ -302,8 +306,7 @@ public class TestTrackerDistributedCache
DistributedCache.addCacheFile(firstCacheDirPublic.toUri(), conf2);
DistributedCache.addCacheFile(firstCacheDirPrivate.toUri(), conf2);
- TrackerDistributedCacheManager.determineTimestamps(conf2);
- TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
+
TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf2);
// Task localizing for second job
JobID job2Id = new JobID("jt", 2);
@@ -339,8 +342,7 @@ public class TestTrackerDistributedCache
// add a file that is never localized
DistributedCache.addCacheFile(thirdCacheFile.toUri(), conf3);
- TrackerDistributedCacheManager.determineTimestamps(conf3);
- TrackerDistributedCacheManager.determineCacheVisibilities(conf3);
+
TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf3);
// Task localizing for third job
// localization for the "firstCacheFile" will fail.
@@ -379,7 +381,7 @@ public class TestTrackerDistributedCache
* @throws LoginException
*/
public void testPublicPrivateCache()
- throws IOException, LoginException, InterruptedException {
+ throws IOException, LoginException, InterruptedException, URISyntaxException
{
if (!canRun()) {
return;
}
@@ -404,8 +406,7 @@ public class TestTrackerDistributedCache
DistributedCache.addCacheFile(cacheFile.toUri(), conf1);
DistributedCache.addCacheArchive(cacheFile.toUri(), conf1);
- TrackerDistributedCacheManager.determineTimestamps(conf1);
- TrackerDistributedCacheManager.determineCacheVisibilities(conf1);
+
TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf1);
dumpState(conf1);
TaskDistributedCacheManager handle = manager
@@ -491,7 +492,7 @@ public class TestTrackerDistributedCache
}
private void checkLocalizedPath(boolean visibility)
- throws IOException, LoginException, InterruptedException {
+ throws IOException, LoginException, InterruptedException, URISyntaxException
{
TrackerDistributedCacheManager manager =
new TrackerDistributedCacheManager(conf, taskController);
String userName = getJobOwnerName();
@@ -506,8 +507,7 @@ public class TestTrackerDistributedCache
Configuration conf1 = new Configuration(conf);
conf1.set("user.name", userName);
DistributedCache.addCacheFile(cacheFile.toUri(), conf1);
- TrackerDistributedCacheManager.determineTimestamps(conf1);
- TrackerDistributedCacheManager.determineCacheVisibilities(conf1);
+
TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf1);
dumpState(conf1);
// Task localizing for job
@@ -894,8 +894,7 @@ public class TestTrackerDistributedCache
createPrivateTempFile(thirdCacheFile);
createPrivateTempFile(fourthCacheFile);
DistributedCache.setCacheFiles(new URI[]{thirdCacheFile.toUri()}, conf2);
- TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
- TrackerDistributedCacheManager.determineTimestamps(conf2);
+
TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf2);
stat = fs.getFileStatus(thirdCacheFile);
CacheFile cfile3 = new CacheFile(thirdCacheFile.toUri(),
CacheFile.FileType.REGULAR, false,
@@ -922,8 +921,7 @@ public class TestTrackerDistributedCache
DistributedCache.setCacheFiles(new URI[]{fourthCacheFile.toUri()}, conf2);
DistributedCache.setLocalFiles(conf2, thirdCacheFile.toUri().toString());
- TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
- TrackerDistributedCacheManager.determineTimestamps(conf2);
+
TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf2);
Path fourthLocalCache = manager.getLocalCache(fourthCacheFile.toUri(),
conf2,
TaskTracker.getPrivateDistributedCacheDir(userName),
fs.getFileStatus(fourthCacheFile), false,
@@ -1100,8 +1098,7 @@ public class TestTrackerDistributedCache
Configuration subConf = new Configuration(myConf);
subConf.set("user.name", userName);
DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
- TrackerDistributedCacheManager.determineTimestamps(subConf);
- TrackerDistributedCacheManager.determineCacheVisibilities(subConf);
+
TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(subConf);
// ****** End of imitating JobClient code
// ****** Imitate TaskRunner code.
@@ -1150,8 +1147,7 @@ public class TestTrackerDistributedCache
Configuration subConf2 = new Configuration(myConf);
subConf2.set("user.name", userName);
DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf2);
- TrackerDistributedCacheManager.determineTimestamps(subConf2);
- TrackerDistributedCacheManager.determineCacheVisibilities(subConf2);
+
TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(subConf2);
handle =
manager.newTaskDistributedCacheManager(new JobID("jt", 2), subConf2);