Modified: 
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1520665&r1=1520664&r2=1520665&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 (original)
+++ 
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 Fri Sep  6 18:52:50 2013
@@ -119,6 +119,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -6756,9 +6757,11 @@ public class FSNamesystem implements Nam
     if (retryCacheEntry != null && retryCacheEntry.isSuccess()) {
       return (List<Fallible<PathCacheEntry>>) retryCacheEntry.getPayload();
     }
-    final FSPermissionChecker pc = getPermissionChecker();
+    final FSPermissionChecker pc = isPermissionEnabled ?
+        getPermissionChecker() : null;
     boolean success = false;
     List<Fallible<PathCacheEntry>> results = null;
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -6766,7 +6769,7 @@ public class FSNamesystem implements Nam
         throw new SafeModeException(
             "Cannot add path cache directive", safeMode);
       }
-      results = cacheManager.addDirectives(pc, directives);
+      results = cacheManager.addDirectives(directives, pc);
       //getEditLog().logAddPathCacheDirectives(results); FIXME: HDFS-5119
       success = true;
     } finally {
@@ -6774,7 +6777,7 @@ public class FSNamesystem implements Nam
       if (success) {
         getEditLog().logSync();
       }
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(success, "addPathCacheDirectives", null, null, null);
       }
       RetryCache.setState(retryCacheEntry, success, results);
@@ -6783,147 +6786,175 @@ public class FSNamesystem implements Nam
   }
 
   @SuppressWarnings("unchecked")
-  List<Fallible<Long>> removePathCacheEntries(List<Long> ids)
-      throws IOException {
-    final FSPermissionChecker pc = getPermissionChecker();
+  List<Fallible<Long>> removePathCacheEntries(List<Long> ids) throws 
IOException {
+    CacheEntryWithPayload retryCacheEntry =
+        RetryCache.waitForCompletion(retryCache, null);
+    if (retryCacheEntry != null && retryCacheEntry.isSuccess()) {
+      return (List<Fallible<Long>>) retryCacheEntry.getPayload();
+    }
+    final FSPermissionChecker pc = isPermissionEnabled ?
+        getPermissionChecker() : null;
     boolean success = false;
     List<Fallible<Long>> results = null;
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       if (isInSafeMode()) {
         throw new SafeModeException(
-            "Cannot add path cache directive", safeMode);
+            "Cannot remove path cache directives", safeMode);
       }
-      results = cacheManager.removeEntries(pc, ids);
+      results = cacheManager.removeEntries(ids, pc);
       //getEditLog().logRemovePathCacheEntries(results); FIXME: HDFS-5119
       success = true;
     } finally {
       writeUnlock();
-      if (success) {
-        getEditLog().logSync();
-      }
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(success, "removePathCacheEntries", null, null, null);
       }
+      RetryCache.setState(retryCacheEntry, success, results);
     }
+    getEditLog().logSync();
     return results;
   }
 
-  List<PathCacheEntry> listPathCacheEntries(long startId,
-      Long poolId, int maxReplies) throws IOException {
-    LOG.info("listPathCacheEntries with " + startId + " " + poolId);
-    final FSPermissionChecker pc = getPermissionChecker();
-    return cacheManager.listPathCacheEntries(pc, startId, poolId, maxReplies);
+  BatchedListEntries<PathCacheEntry> listPathCacheEntries(long startId,
+      String pool) throws IOException {
+    final FSPermissionChecker pc = isPermissionEnabled ?
+        getPermissionChecker() : null;
+    BatchedListEntries<PathCacheEntry> results;
+    checkOperation(OperationCategory.READ);
+    readLock();
+    boolean success = false;
+    try {
+      checkOperation(OperationCategory.READ);
+      results = cacheManager.listPathCacheEntries(startId, pool, pc);
+      success = true;
+    } finally {
+      readUnlock();
+      if (isAuditEnabled() && isExternalInvocation()) {
+        logAuditEvent(success, "listPathCacheEntries", null, null, null);
+      }
+    }
+    return results;
   }
 
-  public CachePool addCachePool(CachePoolInfo req) throws IOException {
-    final FSPermissionChecker pc = getPermissionChecker();
-    CacheEntryWithPayload cacheEntry =
-        RetryCache.waitForCompletion(retryCache, null);
+  public void addCachePool(CachePoolInfo req) throws IOException {
+    final FSPermissionChecker pc = isPermissionEnabled ?
+        getPermissionChecker() : null;
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
-      return (CachePool)cacheEntry.getPayload(); // Return previous response
+      return; // Return previous response
     }
+    checkOperation(OperationCategory.WRITE);
     writeLock();
-    CachePool pool = null;
+    boolean success = false;
     try {
       checkOperation(OperationCategory.WRITE);
-      if (!pc.isSuperUser()) {
-        throw new AccessControlException("Non-super users cannot " +
-            "add cache pools.");
-      }
       if (isInSafeMode()) {
         throw new SafeModeException(
             "Cannot add cache pool " + req.getPoolName(), safeMode);
       }
-      pool = cacheManager.addCachePool(req);
-      RetryCache.setState(cacheEntry, true);
+      if (pc != null) {
+        pc.checkSuperuserPrivilege();
+      }
+      cacheManager.addCachePool(req);
       //getEditLog().logAddCachePool(req); // FIXME: HDFS-5119
+      success = true;
     } finally {
       writeUnlock();
+      if (isAuditEnabled() && isExternalInvocation()) {
+        logAuditEvent(success, "addCachePool", req.getPoolName(), null, null);
+      }
+      RetryCache.setState(cacheEntry, success);
     }
-
+    
     getEditLog().logSync();
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-      logAuditEvent(true, "addCachePool", req.getPoolName(), null, null);
-    }
-    return pool;
   }
 
-  public void modifyCachePool(long poolId, CachePoolInfo info)
-      throws IOException {
-    final FSPermissionChecker pc = getPermissionChecker();
+  public void modifyCachePool(CachePoolInfo req) throws IOException {
+    final FSPermissionChecker pc =
+        isPermissionEnabled ? getPermissionChecker() : null;
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
     }
+    checkOperation(OperationCategory.WRITE);
     writeLock();
+    boolean success = false;
     try {
       checkOperation(OperationCategory.WRITE);
-      if (!pc.isSuperUser()) {
-        throw new AccessControlException("Non-super users cannot " +
-            "modify cache pools.");
-      }
       if (isInSafeMode()) {
         throw new SafeModeException(
-            "Cannot modify cache pool " + info.getPoolName(), safeMode);
+            "Cannot modify cache pool " + req.getPoolName(), safeMode);
+      }
+      if (pc != null) {
+        pc.checkSuperuserPrivilege();
       }
-      cacheManager.modifyCachePool(poolId, info);
-      RetryCache.setState(cacheEntry, true);
+      cacheManager.modifyCachePool(req);
       //getEditLog().logModifyCachePool(req); // FIXME: HDFS-5119
+      success = true;
     } finally {
       writeUnlock();
+      if (isAuditEnabled() && isExternalInvocation()) {
+        logAuditEvent(success, "modifyCachePool", req.getPoolName(), null, 
null);
+      }
+      RetryCache.setState(cacheEntry, success);
     }
 
     getEditLog().logSync();
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-      logAuditEvent(true, "modifyCachePool", info.getPoolName(), null, null);
-    }
   }
 
-  public void removeCachePool(long poolId) throws IOException {
-    final FSPermissionChecker pc = getPermissionChecker();
+  public void removeCachePool(String cachePoolName) throws IOException {
+    final FSPermissionChecker pc =
+        isPermissionEnabled ? getPermissionChecker() : null;
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
+    checkOperation(OperationCategory.WRITE);
     writeLock();
-    CachePool pool;
+    boolean success = false;
     try {
       checkOperation(OperationCategory.WRITE);
-      if (!pc.isSuperUser()) {
-        throw new AccessControlException("Non-super users cannot " +
-            "remove cache pools.");
-      }
-      pool = cacheManager.getCachePool(poolId);
       if (isInSafeMode()) {
-        String identifier;
-        if (pool == null) {
-          identifier = "with id " + Long.toString(poolId);
-        } else {
-          identifier = pool.getInfo().getPoolName();
-        }
         throw new SafeModeException(
-            "Cannot remove cache pool " + identifier, safeMode);
+            "Cannot remove cache pool " + cachePoolName, safeMode);
       }
-      cacheManager.removeCachePool(poolId);
+      if (pc != null) {
+        pc.checkSuperuserPrivilege();
+      }
+      cacheManager.removeCachePool(cachePoolName);
       //getEditLog().logRemoveCachePool(req); // FIXME: HDFS-5119
+      success = true;
     } finally {
       writeUnlock();
+      if (isAuditEnabled() && isExternalInvocation()) {
+        logAuditEvent(success, "removeCachePool", cachePoolName, null, null);
+      }
+      RetryCache.setState(cacheEntry, success);
     }
-
+    
     getEditLog().logSync();
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-      logAuditEvent(true, "removeCachePool", pool.getInfo().getPoolName(),
-          null, null);
-    }
   }
 
-  public List<CachePool> listCachePools(long prevKey,
-      int maxRepliesPerRequest) throws IOException {
-    List<CachePool> results;
+  public BatchedListEntries<CachePoolInfo> listCachePools(String prevKey)
+      throws IOException {
+    final FSPermissionChecker pc =
+        isPermissionEnabled ? getPermissionChecker() : null;
+    BatchedListEntries<CachePoolInfo> results;
+    checkOperation(OperationCategory.READ);
+    boolean success = false;
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      results = cacheManager.listCachePools(prevKey, maxRepliesPerRequest);
+      results = cacheManager.listCachePools(pc, prevKey);
+      success = true;
     } finally {
       readUnlock();
+      if (isAuditEnabled() && isExternalInvocation()) {
+        logAuditEvent(success, "listCachePools", null, null, null);
+      }
     }
     return results;
   }

Modified: 
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java?rev=1520665&r1=1520664&r2=1520665&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
 (original)
+++ 
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
 Fri Sep  6 18:52:50 2013
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
@@ -28,7 +29,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -264,16 +264,15 @@ class FSPermissionChecker {
    * @return if the pool can be accessed
    */
   public boolean checkPermission(CachePool pool, FsAction access) {
-    CachePoolInfo info = pool.getInfo();
-    FsPermission mode = info.getMode();
+    FsPermission mode = pool.getMode();
     if (isSuperUser()) {
       return true;
     }
-    if (user.equals(info.getOwnerName())
+    if (user.equals(pool.getOwnerName())
         && mode.getUserAction().implies(access)) {
       return true;
     }
-    if (groups.contains(info.getGroupName())
+    if (groups.contains(pool.getGroupName())
         && mode.getGroupAction().implies(access)) {
       return true;
     }

Modified: 
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1520665&r1=1520664&r2=1520665&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
 (original)
+++ 
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
 Fri Sep  6 18:52:50 2013
@@ -31,11 +31,13 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
+import java.util.NoSuchElementException;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BatchedRemoteIterator;
+import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
@@ -60,9 +62,9 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
-import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.PathCacheDirective;
 import org.apache.hadoop.hdfs.protocol.PathCacheEntry;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -1223,20 +1225,17 @@ class NameNodeRpcServer implements Namen
   private class ServerSidePathCacheEntriesIterator
       extends BatchedRemoteIterator<Long, PathCacheEntry> {
 
-    private final Long poolId;
+    private final String pool;
 
-    public ServerSidePathCacheEntriesIterator(Long firstKey,
-        int maxRepliesPerRequest, Long poolId) {
-      super(firstKey, maxRepliesPerRequest);
-      this.poolId = poolId;
+    public ServerSidePathCacheEntriesIterator(Long firstKey, String pool) {
+      super(firstKey);
+      this.pool = pool;
     }
 
     @Override
     public BatchedEntries<PathCacheEntry> makeRequest(
-        Long prevKey, int maxRepliesPerRequest) throws IOException {
-      return new BatchedListEntries<PathCacheEntry>(
-          namesystem.listPathCacheEntries(prevKey, poolId,
-              maxRepliesPerRequest));
+        Long nextKey) throws IOException {
+      return namesystem.listPathCacheEntries(nextKey, pool);
     }
 
     @Override
@@ -1244,52 +1243,50 @@ class NameNodeRpcServer implements Namen
       return entry.getEntryId();
     }
   }
-
+  
   @Override
   public RemoteIterator<PathCacheEntry> listPathCacheEntries(long prevId,
-      long poolId, int maxReplies) throws IOException {
-    return new ServerSidePathCacheEntriesIterator(prevId, maxReplies, poolId);
+      String pool) throws IOException {
+    return new ServerSidePathCacheEntriesIterator(prevId, pool);
   }
 
   @Override
-  public CachePool addCachePool(CachePoolInfo info) throws IOException {
-    return namesystem.addCachePool(info);
+  public void addCachePool(CachePoolInfo info) throws IOException {
+    namesystem.addCachePool(info);
   }
 
   @Override
-  public void modifyCachePool(long poolId, CachePoolInfo info)
-      throws IOException {
-    namesystem.modifyCachePool(poolId, info);
+  public void modifyCachePool(CachePoolInfo info) throws IOException {
+    namesystem.modifyCachePool(info);
   }
 
   @Override
-  public void removeCachePool(long poolId) throws IOException {
-    namesystem.removeCachePool(poolId);
+  public void removeCachePool(String cachePoolName) throws IOException {
+    namesystem.removeCachePool(cachePoolName);
   }
 
   private class ServerSideCachePoolIterator 
-      extends BatchedRemoteIterator<Long, CachePool> {
+      extends BatchedRemoteIterator<String, CachePoolInfo> {
 
-    public ServerSideCachePoolIterator(long prevId, int maxRepliesPerRequest) {
-      super(prevId, maxRepliesPerRequest);
+    public ServerSideCachePoolIterator(String prevKey) {
+      super(prevKey);
     }
 
     @Override
-    public BatchedEntries<CachePool> makeRequest(Long prevId,
-        int maxRepliesPerRequest) throws IOException {
-      return new BatchedListEntries<CachePool>(
-          namesystem.listCachePools(prevId, maxRepliesPerRequest));
+    public BatchedEntries<CachePoolInfo> makeRequest(String prevKey)
+        throws IOException {
+      return namesystem.listCachePools(prevKey);
     }
 
     @Override
-    public Long elementToPrevKey(CachePool element) {
-      return element.getId();
+    public String elementToPrevKey(CachePoolInfo element) {
+      return element.getPoolName();
     }
   }
 
   @Override
-  public RemoteIterator<CachePool> listCachePools(long prevPoolId,
-      int maxRepliesPerRequest) throws IOException {
-    return new ServerSideCachePoolIterator(prevPoolId, maxRepliesPerRequest);
+  public RemoteIterator<CachePoolInfo> listCachePools(String prevKey)
+      throws IOException {
+    return new ServerSideCachePoolIterator(prevKey);
   }
 }

Modified: 
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto?rev=1520665&r1=1520664&r2=1520665&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
 (original)
+++ 
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
 Fri Sep  6 18:52:50 2013
@@ -363,27 +363,9 @@ message IsFileClosedResponseProto {
   required bool result = 1;
 }
 
-message CachePoolInfoProto {
-  optional string poolName = 1;
-  optional string ownerName = 2;
-  optional string groupName = 3;
-  optional int32 mode = 4;
-  optional int32 weight = 5;
-}
-
-message CachePoolProto {
-  optional int64 id = 1;
-  optional CachePoolInfoProto info = 2;
-}
-
 message PathCacheDirectiveProto {
   required string path = 1;
-  required CachePoolProto pool = 2;
-}
-
-message PathCacheEntryProto {
-  required int64 id = 1;
-  optional PathCacheDirectiveProto directive = 2;
+  required string pool = 2;
 }
 
 message AddPathCacheDirectivesRequestProto {
@@ -417,42 +399,52 @@ enum RemovePathCacheEntryErrorProto {
 }
 
 message ListPathCacheEntriesRequestProto {
-  required PathCacheEntryProto prevEntry = 1;
-  required CachePoolProto pool = 2;
-  optional int32 maxReplies = 3;
+  required int64 prevId = 1;
+  required string pool = 2;
+}
+
+message ListPathCacheEntriesElementProto {
+  required int64 id = 1;
+  required string path = 2;
+  required string pool = 3;
 }
 
 message ListPathCacheEntriesResponseProto {
-  repeated PathCacheEntryProto entries = 1;
+  repeated ListPathCacheEntriesElementProto elements = 1;
   required bool hasMore = 2;
 }
 
 message AddCachePoolRequestProto {
-  required CachePoolInfoProto info = 1;
+  required string poolName = 1;
+  optional string ownerName = 2;
+  optional string groupName = 3;
+  optional int32 mode = 4;
+  optional int32 weight = 5;
 }
 
-message AddCachePoolResponseProto {
-  required CachePoolProto pool = 1;
+message AddCachePoolResponseProto { // void response
 }
 
 message ModifyCachePoolRequestProto {
-  required CachePoolProto pool = 1;
-  required CachePoolInfoProto info = 2;
+  required string poolName = 1;
+  optional string ownerName = 2;
+  optional string groupName = 3;
+  optional int32 mode = 4;
+  optional int32 weight = 5;
 }
 
 message ModifyCachePoolResponseProto { // void response
 }
 
 message RemoveCachePoolRequestProto {
-  required CachePoolProto pool = 1;
+  required string poolName = 1;
 }
 
 message RemoveCachePoolResponseProto { // void response
 }
 
 message ListCachePoolsRequestProto {
-  required CachePoolProto prevPool = 1;
-  required int32 maxReplies = 2;
+  required string prevPoolName = 1;
 }
 
 message ListCachePoolsResponseProto {
@@ -461,7 +453,11 @@ message ListCachePoolsResponseProto {
 }
 
 message ListCachePoolsResponseElementProto {
-  required CachePoolProto pool = 1;
+  required string poolName = 1;
+  required string ownerName = 2;
+  required string groupName = 3;
+  required int32 mode = 4;
+  required int32 weight = 5;
 }
 
 message GetFileLinkInfoRequestProto {

Modified: 
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathCacheRequests.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathCacheRequests.java?rev=1520665&r1=1520664&r2=1520665&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathCacheRequests.java
 (original)
+++ 
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathCacheRequests.java
 Fri Sep  6 18:52:50 2013
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
@@ -31,64 +31,58 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import 
org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.EmptyPathError;
+import 
org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoolNameError;
 import 
org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPathNameError;
-import 
org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoolError;
 import 
org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.PoolWritePermissionDeniedError;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import 
org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.InvalidIdException;
 import org.apache.hadoop.hdfs.protocol.PathCacheDirective;
 import org.apache.hadoop.hdfs.protocol.PathCacheEntry;
-import 
org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.InvalidIdException;
 import 
org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.NoSuchIdException;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Fallible;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
 public class TestPathCacheRequests {
   static final Log LOG = LogFactory.getLog(TestPathCacheRequests.class);
 
-  private static Configuration conf = new HdfsConfiguration();
-  private static MiniDFSCluster cluster = null;
-  private static NamenodeProtocols proto = null;
-
-  @Before
-  public void setUp() throws Exception {
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
-    cluster.waitActive();
-    proto = cluster.getNameNodeRpc();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-  }
+  private static final UserGroupInformation unprivilegedUser =
+      UserGroupInformation.createRemoteUser("unprivilegedUser");
 
   @Test
   public void testCreateAndRemovePools() throws Exception {
-    CachePoolInfo req =
-        CachePoolInfo.newBuilder().setPoolName("pool1").setOwnerName("bob")
-            .setGroupName("bobgroup").setMode(new FsPermission((short) 0755))
-            .setWeight(150).build();
-    CachePool pool = proto.addCachePool(req);
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = null;
+
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    NamenodeProtocols proto = cluster.getNameNodeRpc();
+    CachePoolInfo req = new CachePoolInfo("pool1").
+        setOwnerName("bob").setGroupName("bobgroup").
+        setMode(new FsPermission((short)0755)).setWeight(150);
+    proto.addCachePool(req);
     try {
-      proto.removeCachePool(909);
+      proto.removeCachePool("pool99");
       Assert.fail("expected to get an exception when " +
           "removing a non-existent pool.");
     } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("can't remove " +
+          "nonexistent cache pool", ioe);
     }
-    proto.removeCachePool(pool.getId());
+    proto.removeCachePool("pool1");
     try {
-      proto.removeCachePool(pool.getId());
+      proto.removeCachePool("pool1");
       Assert.fail("expected to get an exception when " +
           "removing a non-existent pool.");
     } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("can't remove " +
+          "nonexistent cache pool", ioe);
     }
     req = new CachePoolInfo("pool2");
     proto.addCachePool(req);
@@ -96,42 +90,36 @@ public class TestPathCacheRequests {
 
   @Test
   public void testCreateAndModifyPools() throws Exception {
-    // Create a new pool
-    CachePoolInfo info = CachePoolInfo.newBuilder().
-        setPoolName("pool1").
-        setOwnerName("abc").
-        setGroupName("123").
-        setMode(new FsPermission((short)0755)).
-        setWeight(150).
-        build();
-    CachePool pool = proto.addCachePool(info);
-    CachePoolInfo actualInfo = pool.getInfo();
-    assertEquals("Expected info to match create time settings",
-        info, actualInfo);
-    // Modify the pool
-    info = CachePoolInfo.newBuilder().
-        setPoolName("pool2").
-        setOwnerName("def").
-        setGroupName("456").
-        setMode(new FsPermission((short)0644)).
-        setWeight(200).
-        build();
-    proto.modifyCachePool(pool.getId(), info);
-    // Check via listing this time
-    RemoteIterator<CachePool> iter = proto.listCachePools(0, 1);
-    CachePool listedPool = iter.next();
-    actualInfo = listedPool.getInfo();
-    assertEquals("Expected info to match modified settings", info, actualInfo);
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = null;
+    // set low limits here for testing purposes
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES, 2);
+    
conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES, 2);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    NamenodeProtocols proto = cluster.getNameNodeRpc();
+    proto.addCachePool(new CachePoolInfo("pool1").
+        setOwnerName("abc").setGroupName("123").
+        setMode(new FsPermission((short)0755)).setWeight(150));
+    proto.modifyCachePool(new CachePoolInfo("pool1").
+        setOwnerName("def").setGroupName("456"));
+    RemoteIterator<CachePoolInfo> iter = proto.listCachePools("");
+    CachePoolInfo info = iter.next();
+    assertEquals("pool1", info.getPoolName());
+    assertEquals("def", info.getOwnerName());
+    assertEquals("456", info.getGroupName());
+    assertEquals(new FsPermission((short)0755), info.getMode());
+    assertEquals(Integer.valueOf(150), info.getWeight());
 
     try {
-      proto.removeCachePool(808);
+      proto.removeCachePool("pool99");
       Assert.fail("expected to get an exception when " +
           "removing a non-existent pool.");
     } catch (IOException ioe) {
     }
-    proto.removeCachePool(pool.getId());
+    proto.removeCachePool("pool1");
     try {
-      proto.removeCachePool(pool.getId());
+      proto.removeCachePool("pool1");
       Assert.fail("expected to get an exception when " +
           "removing a non-existent pool.");
     } catch (IOException ioe) {
@@ -142,13 +130,13 @@ public class TestPathCacheRequests {
       RemoteIterator<PathCacheEntry> iter,
       long id0, long id1, long id2) throws Exception {
     Assert.assertEquals(new PathCacheEntry(id0,
-        new PathCacheDirective("/alpha", 1)),
+        new PathCacheDirective("/alpha", "pool1")),
         iter.next());
     Assert.assertEquals(new PathCacheEntry(id1,
-        new PathCacheDirective("/beta", 2)),
+        new PathCacheDirective("/beta", "pool2")),
         iter.next());
     Assert.assertEquals(new PathCacheEntry(id2,
-        new PathCacheDirective("/gamma", 1)),
+        new PathCacheDirective("/gamma", "pool1")),
         iter.next());
     Assert.assertFalse(iter.hasNext());
   }
@@ -161,36 +149,34 @@ public class TestPathCacheRequests {
     try {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
       cluster.waitActive();
-      final CachePool pool1 = proto.addCachePool(new CachePoolInfo("pool1"));
-      final CachePool pool2 = proto.addCachePool(new CachePoolInfo("pool2"));
-      final CachePool pool3 = proto.addCachePool(new CachePoolInfo("pool3"));
-      final CachePool pool4 = proto.addCachePool(CachePoolInfo.newBuilder()
-          .setPoolName("pool4")
-          .setMode(new FsPermission((short)0)).build());
-      UserGroupInformation testUgi = UserGroupInformation
-          .createUserForTesting("myuser", new String[]{"mygroup"});
-      List<Fallible<PathCacheEntry>> addResults1 = testUgi.doAs(
-          new PrivilegedExceptionAction<List<Fallible<PathCacheEntry>>>() {
-            @Override
-            public List<Fallible<PathCacheEntry>> run() throws IOException {
-              List<Fallible<PathCacheEntry>> entries;
-              entries = proto.addPathCacheDirectives(
-                  Arrays.asList(new PathCacheDirective[] {
-                      new PathCacheDirective("/alpha", pool1.getId()),
-                      new PathCacheDirective("/beta", pool2.getId()),
-                      new PathCacheDirective("", pool3.getId()),
-                      new PathCacheDirective("/zeta", 404),
-                      new PathCacheDirective("/zeta", pool4.getId())
-                  }));
-              return entries;
+      final NamenodeProtocols proto = cluster.getNameNodeRpc();
+      proto.addCachePool(new CachePoolInfo("pool1").
+          setMode(new FsPermission((short)0777)));
+      proto.addCachePool(new CachePoolInfo("pool2").
+          setMode(new FsPermission((short)0777)));
+      proto.addCachePool(new CachePoolInfo("pool3").
+          setMode(new FsPermission((short)0777)));
+      proto.addCachePool(new CachePoolInfo("pool4").
+          setMode(new FsPermission((short)0)));
+
+      List<Fallible<PathCacheEntry>> addResults1 = 
+        unprivilegedUser.doAs(new PrivilegedExceptionAction<
+            List<Fallible<PathCacheEntry>>>() {
+          @Override
+          public List<Fallible<PathCacheEntry>> run() throws IOException {
+            return proto.addPathCacheDirectives(Arrays.asList(
+              new PathCacheDirective[] {
+                new PathCacheDirective("/alpha", "pool1"),
+                new PathCacheDirective("/beta", "pool2"),
+                new PathCacheDirective("", "pool3"),
+                new PathCacheDirective("/zeta", "nonexistent_pool"),
+                new PathCacheDirective("/zeta", "pool4")
+              }));
             }
-      });
-      // Save the successful additions
+          });
       long ids1[] = new long[2];
-      for (int i=0; i<2; i++) {
-        ids1[i] = addResults1.get(i).get().getEntryId();
-      }
-      // Verify that the unsuccessful additions failed properly
+      ids1[0] = addResults1.get(0).get().getEntryId();
+      ids1[1] = addResults1.get(1).get().getEntryId();
       try {
         addResults1.get(2).get();
         Assert.fail("expected an error when adding an empty path");
@@ -201,7 +187,7 @@ public class TestPathCacheRequests {
         addResults1.get(3).get();
         Assert.fail("expected an error when adding to a nonexistent pool.");
       } catch (IOException ioe) {
-        Assert.assertTrue(ioe.getCause() instanceof InvalidPoolError);
+        Assert.assertTrue(ioe.getCause() instanceof InvalidPoolNameError);
       }
       try {
         addResults1.get(4).get();
@@ -215,10 +201,10 @@ public class TestPathCacheRequests {
       List<Fallible<PathCacheEntry>> addResults2 = 
           proto.addPathCacheDirectives(Arrays.asList(
             new PathCacheDirective[] {
-        new PathCacheDirective("/alpha", pool1.getId()),
-        new PathCacheDirective("/theta", 404),
-        new PathCacheDirective("bogus", pool1.getId()),
-        new PathCacheDirective("/gamma", pool1.getId())
+        new PathCacheDirective("/alpha", "pool1"),
+        new PathCacheDirective("/theta", ""),
+        new PathCacheDirective("bogus", "pool1"),
+        new PathCacheDirective("/gamma", "pool1")
       }));
       long id = addResults2.get(0).get().getEntryId();
       Assert.assertEquals("expected to get back the same ID as last time " +
@@ -228,7 +214,7 @@ public class TestPathCacheRequests {
         Assert.fail("expected an error when adding a path cache " +
             "directive with an empty pool name.");
       } catch (IOException ioe) {
-        Assert.assertTrue(ioe.getCause() instanceof InvalidPoolError);
+        Assert.assertTrue(ioe.getCause() instanceof InvalidPoolNameError);
       }
       try {
         addResults2.get(2).get();
@@ -240,16 +226,14 @@ public class TestPathCacheRequests {
       long ids2[] = new long[1];
       ids2[0] = addResults2.get(3).get().getEntryId();
 
-      // Validate listing all entries
       RemoteIterator<PathCacheEntry> iter =
-          proto.listPathCacheEntries(-1l, -1l, 100);
+          proto.listPathCacheEntries(0, "");
       validateListAll(iter, ids1[0], ids1[1], ids2[0]);
-      iter = proto.listPathCacheEntries(-1l, -1l, 1);
+      iter = proto.listPathCacheEntries(0, "");
       validateListAll(iter, ids1[0], ids1[1], ids2[0]);
-      // Validate listing certain pools
-      iter = proto.listPathCacheEntries(0, pool3.getId(), 1);
+      iter = proto.listPathCacheEntries(0, "pool3");
       Assert.assertFalse(iter.hasNext());
-      iter = proto.listPathCacheEntries(0, pool2.getId(), 4444);
+      iter = proto.listPathCacheEntries(0, "pool2");
       Assert.assertEquals(addResults1.get(1).get(),
           iter.next());
       Assert.assertFalse(iter.hasNext());
@@ -271,7 +255,7 @@ public class TestPathCacheRequests {
       } catch (IOException ioe) {
         Assert.assertTrue(ioe.getCause() instanceof NoSuchIdException);
       }
-      iter = proto.listPathCacheEntries(0, pool2.getId(), 4444);
+      iter = proto.listPathCacheEntries(0, "pool2");
       Assert.assertFalse(iter.hasNext());
     } finally {
       if (cluster != null) { cluster.shutdown(); }


Reply via email to