Repository: hadoop
Updated Branches:
  refs/heads/trunk 77b015000 -> fa121eb66


HDFS-13790. RBF: Move ClientProtocol APIs to its own module. Contributed by 
Chao Sun.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fa121eb6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fa121eb6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fa121eb6

Branch: refs/heads/trunk
Commit: fa121eb66bc42e9cb5586f8c2e268cfdc2ed187a
Parents: 77b0150
Author: Brahma Reddy Battula <bra...@apache.org>
Authored: Fri Aug 17 15:22:55 2018 +0530
Committer: Brahma Reddy Battula <bra...@apache.org>
Committed: Fri Aug 17 15:22:55 2018 +0530

----------------------------------------------------------------------
 .../federation/router/RouterRpcServer.java      | 1360 ++----------------
 1 file changed, 158 insertions(+), 1202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa121eb6/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 29f32a6..fe54993 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -33,16 +33,12 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.TreeMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
@@ -54,7 +50,6 @@ import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.QuotaUsage;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.XAttr;
@@ -64,7 +59,6 @@ import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.AddBlockFlag;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.inotify.EventBatchList;
 import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
@@ -93,7 +87,6 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
-import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
 import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
 import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
@@ -101,8 +94,8 @@ import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
-import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
 import 
org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
+import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
 import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
 import 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
@@ -167,11 +160,6 @@ public class RouterRpcServer extends AbstractService
   /** Configuration for the RPC server. */
   private Configuration conf;
 
-  /** Identifier for the super user. */
-  private final String superUser;
-  /** Identifier for the super group. */
-  private final String superGroup;
-
   /** Router using this RPC server. */
   private final Router router;
 
@@ -199,11 +187,10 @@ public class RouterRpcServer extends AbstractService
   // Modules implementing groups of RPC calls
   /** Router Quota calls. */
   private final Quota quotaCall;
-  /** Erasure coding calls. */
-  private final ErasureCoding erasureCoding;
   /** NamenodeProtocol calls. */
   private final RouterNamenodeProtocol nnProto;
-
+  /** ClientProtocol calls. */
+  private final RouterClientProtocol clientProto;
 
   /**
    * Construct a router RPC server.
@@ -223,12 +210,6 @@ public class RouterRpcServer extends AbstractService
     this.namenodeResolver = nnResolver;
     this.subclusterResolver = fileResolver;
 
-    // User and group for reporting
-    this.superUser = System.getProperty("user.name");
-    this.superGroup = this.conf.get(
-        DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
-        DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
-
     // RPC server settings
     int handlerCount = this.conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY,
         DFS_ROUTER_HANDLER_COUNT_DEFAULT);
@@ -315,8 +296,8 @@ public class RouterRpcServer extends AbstractService
 
     // Initialize modules
     this.quotaCall = new Quota(this.router, this);
-    this.erasureCoding = new ErasureCoding(this);
     this.nnProto = new RouterNamenodeProtocol(this);
+    this.clientProto = new RouterClientProtocol(conf, this);
   }
 
   @Override
@@ -371,6 +352,13 @@ public class RouterRpcServer extends AbstractService
   }
 
   /**
+   * Get the active namenode resolver
+   */
+  public ActiveNamenodeResolver getNamenodeResolver() {
+    return namenodeResolver;
+  }
+
+  /**
    * Get the RPC monitor and metrics.
    *
    * @return RPC monitor and metrics.
@@ -411,7 +399,7 @@ public class RouterRpcServer extends AbstractService
    *                           client requests.
    * @throws UnsupportedOperationException If the operation is not supported.
    */
-  protected void checkOperation(OperationCategory op, boolean supported)
+  void checkOperation(OperationCategory op, boolean supported)
       throws StandbyException, UnsupportedOperationException {
     checkOperation(op);
 
@@ -433,7 +421,7 @@ public class RouterRpcServer extends AbstractService
    * @throws SafeModeException If the Router is in safe mode and cannot serve
    *                           client requests.
    */
-  protected void checkOperation(OperationCategory op)
+  void checkOperation(OperationCategory op)
       throws StandbyException {
     // Log the function we are currently calling.
     if (rpcMonitor != null) {
@@ -464,58 +452,44 @@ public class RouterRpcServer extends AbstractService
     }
   }
 
+  /**
+   * Get the name of the method that is calling this function.
+   *
+   * @return Name of the method calling this function.
+   */
+  static String getMethodName() {
+    final StackTraceElement[] stack = Thread.currentThread().getStackTrace();
+    String methodName = stack[3].getMethodName();
+    return methodName;
+  }
+
   @Override // ClientProtocol
   public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
       throws IOException {
-    checkOperation(OperationCategory.WRITE, false);
-    return null;
-  }
-
-  /**
-   * The the delegation token from each name service.
-   * @param renewer
-   * @return Name service -> Token.
-   * @throws IOException
-   */
-  public Map<FederationNamespaceInfo, Token<DelegationTokenIdentifier>>
-      getDelegationTokens(Text renewer) throws IOException {
-    checkOperation(OperationCategory.WRITE, false);
-    return null;
+    return clientProto.getDelegationToken(renewer);
   }
 
   @Override // ClientProtocol
   public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
       throws IOException {
-    checkOperation(OperationCategory.WRITE, false);
-    return 0;
+    return clientProto.renewDelegationToken(token);
   }
 
   @Override // ClientProtocol
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
       throws IOException {
-    checkOperation(OperationCategory.WRITE, false);
+    clientProto.cancelDelegationToken(token);
   }
 
   @Override // ClientProtocol
   public LocatedBlocks getBlockLocations(String src, final long offset,
       final long length) throws IOException {
-    checkOperation(OperationCategory.READ);
-
-    List<RemoteLocation> locations = getLocationsForPath(src, false);
-    RemoteMethod remoteMethod = new RemoteMethod("getBlockLocations",
-        new Class<?>[] {String.class, long.class, long.class},
-        new RemoteParam(), offset, length);
-    return (LocatedBlocks) rpcClient.invokeSequential(locations, remoteMethod,
-        LocatedBlocks.class, null);
+    return clientProto.getBlockLocations(src, offset, length);
   }
 
   @Override // ClientProtocol
   public FsServerDefaults getServerDefaults() throws IOException {
-    checkOperation(OperationCategory.READ);
-
-    RemoteMethod method = new RemoteMethod("getServerDefaults");
-    String ns = subclusterResolver.getDefaultNamespace();
-    return (FsServerDefaults) rpcClient.invokeSingle(ns, method);
+    return clientProto.getServerDefaults();
   }
 
   @Override // ClientProtocol
@@ -524,44 +498,8 @@ public class RouterRpcServer extends AbstractService
       boolean createParent, short replication, long blockSize,
       CryptoProtocolVersion[] supportedVersions, String ecPolicyName)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    if (createParent && isPathAll(src)) {
-      int index = src.lastIndexOf(Path.SEPARATOR);
-      String parent = src.substring(0, index);
-      LOG.debug("Creating {} requires creating parent {}", src, parent);
-      FsPermission parentPermissions = getParentPermission(masked);
-      boolean success = mkdirs(parent, parentPermissions, createParent);
-      if (!success) {
-        // This shouldn't happen as mkdirs returns true or exception
-        LOG.error("Couldn't create parents for {}", src);
-      }
-    }
-
-    RemoteLocation createLocation = getCreateLocation(src);
-    RemoteMethod method = new RemoteMethod("create",
-        new Class<?>[] {String.class, FsPermission.class, String.class,
-                        EnumSetWritable.class, boolean.class, short.class,
-                        long.class, CryptoProtocolVersion[].class,
-                        String.class},
-        createLocation.getDest(), masked, clientName, flag, createParent,
+    return clientProto.create(src, masked, clientName, flag, createParent,
         replication, blockSize, supportedVersions, ecPolicyName);
-    return (HdfsFileStatus) rpcClient.invokeSingle(createLocation, method);
-  }
-
-  /**
-   * Get the permissions for the parent of a child with given permissions.
-   * Add implicit u+wx permission for parent. This is based on
-   * @{FSDirMkdirOp#addImplicitUwx}.
-   * @param mask The permission mask of the child.
-   * @return The permission mask of the parent.
-   */
-  private static FsPermission getParentPermission(final FsPermission mask) {
-    FsPermission ret = new FsPermission(
-        mask.getUserAction().or(FsAction.WRITE_EXECUTE),
-        mask.getGroupAction(),
-        mask.getOtherAction());
-    return ret;
   }
 
   /**
@@ -572,7 +510,7 @@ public class RouterRpcServer extends AbstractService
    * @return The remote location for this file.
    * @throws IOException If the file has no creation location.
    */
-  protected RemoteLocation getCreateLocation(final String src)
+  RemoteLocation getCreateLocation(final String src)
       throws IOException {
 
     final List<RemoteLocation> locations = getLocationsForPath(src, true);
@@ -613,100 +551,45 @@ public class RouterRpcServer extends AbstractService
     return createLocation;
   }
 
-  // Medium
   @Override // ClientProtocol
   public LastBlockWithStatus append(String src, final String clientName,
       final EnumSetWritable<CreateFlag> flag) throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    List<RemoteLocation> locations = getLocationsForPath(src, true);
-    RemoteMethod method = new RemoteMethod("append",
-        new Class<?>[] {String.class, String.class, EnumSetWritable.class},
-        new RemoteParam(), clientName, flag);
-    return rpcClient.invokeSequential(
-        locations, method, LastBlockWithStatus.class, null);
+    return clientProto.append(src, clientName, flag);
   }
 
-  // Low
   @Override // ClientProtocol
   public boolean recoverLease(String src, String clientName)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    final List<RemoteLocation> locations = getLocationsForPath(src, true);
-    RemoteMethod method = new RemoteMethod("recoverLease",
-        new Class<?>[] {String.class, String.class}, new RemoteParam(),
-        clientName);
-    Object result = rpcClient.invokeSequential(
-        locations, method, Boolean.class, Boolean.TRUE);
-    return (boolean) result;
+    return clientProto.recoverLease(src, clientName);
   }
 
   @Override // ClientProtocol
   public boolean setReplication(String src, short replication)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    List<RemoteLocation> locations = getLocationsForPath(src, true);
-    RemoteMethod method = new RemoteMethod("setReplication",
-        new Class<?>[] {String.class, short.class}, new RemoteParam(),
-        replication);
-    Object result = rpcClient.invokeSequential(
-        locations, method, Boolean.class, Boolean.TRUE);
-    return (boolean) result;
+    return clientProto.setReplication(src, replication);
   }
 
-  @Override
+  @Override // ClientProtocol
   public void setStoragePolicy(String src, String policyName)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    List<RemoteLocation> locations = getLocationsForPath(src, true);
-    RemoteMethod method = new RemoteMethod("setStoragePolicy",
-        new Class<?>[] {String.class, String.class},
-        new RemoteParam(), policyName);
-    rpcClient.invokeSequential(locations, method, null, null);
+    clientProto.setStoragePolicy(src, policyName);
   }
 
-  @Override
+  @Override // ClientProtocol
   public BlockStoragePolicy[] getStoragePolicies() throws IOException {
-    checkOperation(OperationCategory.READ);
-
-    RemoteMethod method = new RemoteMethod("getStoragePolicies");
-    String ns = subclusterResolver.getDefaultNamespace();
-    return (BlockStoragePolicy[]) rpcClient.invokeSingle(ns, method);
+    return clientProto.getStoragePolicies();
   }
 
   @Override // ClientProtocol
   public void setPermission(String src, FsPermission permissions)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    final List<RemoteLocation> locations = getLocationsForPath(src, true);
-    RemoteMethod method = new RemoteMethod("setPermission",
-        new Class<?>[] {String.class, FsPermission.class},
-        new RemoteParam(), permissions);
-    if (isPathAll(src)) {
-      rpcClient.invokeConcurrent(locations, method);
-    } else {
-      rpcClient.invokeSequential(locations, method);
-    }
+    clientProto.setPermission(src, permissions);
   }
 
   @Override // ClientProtocol
   public void setOwner(String src, String username, String groupname)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    final List<RemoteLocation> locations = getLocationsForPath(src, true);
-    RemoteMethod method = new RemoteMethod("setOwner",
-        new Class<?>[] {String.class, String.class, String.class},
-        new RemoteParam(), username, groupname);
-    if (isPathAll(src)) {
-      rpcClient.invokeConcurrent(locations, method);
-    } else {
-      rpcClient.invokeSequential(locations, method);
-    }
+    clientProto.setOwner(src, username, groupname);
   }
 
   /**
@@ -718,18 +601,8 @@ public class RouterRpcServer extends AbstractService
       ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId,
       String[] favoredNodes, EnumSet<AddBlockFlag> addBlockFlags)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    final List<RemoteLocation> locations = getLocationsForPath(src, true);
-    RemoteMethod method = new RemoteMethod("addBlock",
-        new Class<?>[] {String.class, String.class, ExtendedBlock.class,
-                        DatanodeInfo[].class, long.class, String[].class,
-                        EnumSet.class},
-        new RemoteParam(), clientName, previous, excludedNodes, fileId,
-        favoredNodes, addBlockFlags);
-    // TODO verify the excludedNodes and favoredNodes are acceptable to this NN
-    return (LocatedBlock) rpcClient.invokeSequential(
-        locations, method, LocatedBlock.class, null);
+    return clientProto.addBlock(src, clientName, previous, excludedNodes,
+        fileId, favoredNodes, addBlockFlags);
   }
 
   /**
@@ -742,55 +615,26 @@ public class RouterRpcServer extends AbstractService
       final String[] existingStorageIDs, final DatanodeInfo[] excludes,
       final int numAdditionalNodes, final String clientName)
           throws IOException {
-    checkOperation(OperationCategory.READ);
-
-    final List<RemoteLocation> locations = getLocationsForPath(src, false);
-    RemoteMethod method = new RemoteMethod("getAdditionalDatanode",
-        new Class<?>[] {String.class, long.class, ExtendedBlock.class,
-                        DatanodeInfo[].class, String[].class,
-                        DatanodeInfo[].class, int.class, String.class},
-        new RemoteParam(), fileId, blk, existings, existingStorageIDs, 
excludes,
-        numAdditionalNodes, clientName);
-    return (LocatedBlock) rpcClient.invokeSequential(
-        locations, method, LocatedBlock.class, null);
+    return clientProto.getAdditionalDatanode(src, fileId, blk, existings,
+        existingStorageIDs, excludes, numAdditionalNodes, clientName);
   }
 
   @Override // ClientProtocol
   public void abandonBlock(ExtendedBlock b, long fileId, String src,
       String holder) throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    RemoteMethod method = new RemoteMethod("abandonBlock",
-        new Class<?>[] {ExtendedBlock.class, long.class, String.class,
-                        String.class},
-        b, fileId, new RemoteParam(), holder);
-    rpcClient.invokeSingle(b, method);
+    clientProto.abandonBlock(b, fileId, src, holder);
   }
 
   @Override // ClientProtocol
   public boolean complete(String src, String clientName, ExtendedBlock last,
       long fileId) throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    final List<RemoteLocation> locations = getLocationsForPath(src, true);
-    RemoteMethod method = new RemoteMethod("complete",
-        new Class<?>[] {String.class, String.class, ExtendedBlock.class,
-                        long.class},
-        new RemoteParam(), clientName, last, fileId);
-    // Complete can return true/false, so don't expect a result
-    return ((Boolean) rpcClient.invokeSequential(
-        locations, method, Boolean.class, null)).booleanValue();
+    return clientProto.complete(src, clientName, last, fileId);
   }
 
   @Override // ClientProtocol
   public LocatedBlock updateBlockForPipeline(
       ExtendedBlock block, String clientName) throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    RemoteMethod method = new RemoteMethod("updateBlockForPipeline",
-        new Class<?>[] {ExtendedBlock.class, String.class},
-        block, clientName);
-    return (LocatedBlock) rpcClient.invokeSingle(block, method);
+    return clientProto.updateBlockForPipeline(block, clientName);
   }
 
   /**
@@ -801,462 +645,91 @@ public class RouterRpcServer extends AbstractService
   public void updatePipeline(String clientName, ExtendedBlock oldBlock,
       ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs)
           throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    RemoteMethod method = new RemoteMethod("updatePipeline",
-        new Class<?>[] {String.class, ExtendedBlock.class, ExtendedBlock.class,
-                        DatanodeID[].class, String[].class},
-        clientName, oldBlock, newBlock, newNodes, newStorageIDs);
-    rpcClient.invokeSingle(oldBlock, method);
+    clientProto.updatePipeline(clientName, oldBlock, newBlock, newNodes,
+        newStorageIDs);
   }
 
   @Override // ClientProtocol
   public long getPreferredBlockSize(String src) throws IOException {
-    checkOperation(OperationCategory.READ);
-
-    final List<RemoteLocation> locations = getLocationsForPath(src, true);
-    RemoteMethod method = new RemoteMethod("getPreferredBlockSize",
-        new Class<?>[] {String.class}, new RemoteParam());
-    return ((Long) rpcClient.invokeSequential(
-        locations, method, Long.class, null)).longValue();
-  }
-
-  /**
-   * Determines combinations of eligible src/dst locations for a rename. A
-   * rename cannot change the namespace. Renames are only allowed if there is 
an
-   * eligible dst location in the same namespace as the source.
-   *
-   * @param srcLocations List of all potential source destinations where the
-   *          path may be located. On return this list is trimmed to include
-   *          only the paths that have corresponding destinations in the same
-   *          namespace.
-   * @param dst The destination path
-   * @return A map of all eligible source namespaces and their corresponding
-   *         replacement value.
-   * @throws IOException If the dst paths could not be determined.
-   */
-  private RemoteParam getRenameDestinations(
-      final List<RemoteLocation> srcLocations, final String dst)
-          throws IOException {
-
-    final List<RemoteLocation> dstLocations = getLocationsForPath(dst, true);
-    final Map<RemoteLocation, String> dstMap = new HashMap<>();
-
-    Iterator<RemoteLocation> iterator = srcLocations.iterator();
-    while (iterator.hasNext()) {
-      RemoteLocation srcLocation = iterator.next();
-      RemoteLocation eligibleDst =
-          getFirstMatchingLocation(srcLocation, dstLocations);
-      if (eligibleDst != null) {
-        // Use this dst for this source location
-        dstMap.put(srcLocation, eligibleDst.getDest());
-      } else {
-        // This src destination is not valid, remove from the source list
-        iterator.remove();
-      }
-    }
-    return new RemoteParam(dstMap);
-  }
-
-  /**
-   * Get first matching location.
-   *
-   * @param location Location we are looking for.
-   * @param locations List of locations.
-   * @return The first matchin location in the list.
-   */
-  private RemoteLocation getFirstMatchingLocation(RemoteLocation location,
-      List<RemoteLocation> locations) {
-    for (RemoteLocation loc : locations) {
-      if (loc.getNameserviceId().equals(location.getNameserviceId())) {
-        // Return first matching location
-        return loc;
-      }
-    }
-    return null;
+    return clientProto.getPreferredBlockSize(src);
   }
 
   @Deprecated
   @Override // ClientProtocol
   public boolean rename(final String src, final String dst)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    final List<RemoteLocation> srcLocations =
-        getLocationsForPath(src, true, false);
-    // srcLocations may be trimmed by getRenameDestinations()
-    final List<RemoteLocation> locs = new LinkedList<>(srcLocations);
-    RemoteParam dstParam = getRenameDestinations(locs, dst);
-    if (locs.isEmpty()) {
-      throw new IOException(
-          "Rename of " + src + " to " + dst + " is not allowed," +
-          " no eligible destination in the same namespace was found.");
-    }
-    RemoteMethod method = new RemoteMethod("rename",
-        new Class<?>[] {String.class, String.class},
-        new RemoteParam(), dstParam);
-    return ((Boolean) rpcClient.invokeSequential(
-        locs, method, Boolean.class, Boolean.TRUE)).booleanValue();
+    return clientProto.rename(src, dst);
   }
 
   @Override // ClientProtocol
   public void rename2(final String src, final String dst,
       final Options.Rename... options) throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    final List<RemoteLocation> srcLocations =
-        getLocationsForPath(src, true, false);
-    // srcLocations may be trimmed by getRenameDestinations()
-    final List<RemoteLocation> locs = new LinkedList<>(srcLocations);
-    RemoteParam dstParam = getRenameDestinations(locs, dst);
-    if (locs.isEmpty()) {
-      throw new IOException(
-          "Rename of " + src + " to " + dst + " is not allowed," +
-          " no eligible destination in the same namespace was found.");
-    }
-    RemoteMethod method = new RemoteMethod("rename2",
-        new Class<?>[] {String.class, String.class, options.getClass()},
-        new RemoteParam(), dstParam, options);
-    rpcClient.invokeSequential(locs, method, null, null);
+    clientProto.rename2(src, dst, options);
   }
 
   @Override // ClientProtocol
   public void concat(String trg, String[] src) throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    // See if the src and target files are all in the same namespace
-    LocatedBlocks targetBlocks = getBlockLocations(trg, 0, 1);
-    if (targetBlocks == null) {
-      throw new IOException("Cannot locate blocks for target file - " + trg);
-    }
-    LocatedBlock lastLocatedBlock = targetBlocks.getLastLocatedBlock();
-    String targetBlockPoolId = lastLocatedBlock.getBlock().getBlockPoolId();
-    for (String source : src) {
-      LocatedBlocks sourceBlocks = getBlockLocations(source, 0, 1);
-      if (sourceBlocks == null) {
-        throw new IOException(
-            "Cannot located blocks for source file " + source);
-      }
-      String sourceBlockPoolId =
-          sourceBlocks.getLastLocatedBlock().getBlock().getBlockPoolId();
-      if (!sourceBlockPoolId.equals(targetBlockPoolId)) {
-        throw new IOException("Cannot concatenate source file " + source
-            + " because it is located in a different namespace"
-            + " with block pool id " + sourceBlockPoolId
-            + " from the target file with block pool id "
-            + targetBlockPoolId);
-      }
-    }
-
-    // Find locations in the matching namespace.
-    final RemoteLocation targetDestination =
-        getLocationForPath(trg, true, targetBlockPoolId);
-    String[] sourceDestinations = new String[src.length];
-    for (int i = 0; i < src.length; i++) {
-      String sourceFile = src[i];
-      RemoteLocation location =
-          getLocationForPath(sourceFile, true, targetBlockPoolId);
-      sourceDestinations[i] = location.getDest();
-    }
-    // Invoke
-    RemoteMethod method = new RemoteMethod("concat",
-        new Class<?>[] {String.class, String[].class},
-        targetDestination.getDest(), sourceDestinations);
-    rpcClient.invokeSingle(targetDestination, method);
+    clientProto.concat(trg, src);
   }
 
   @Override // ClientProtocol
   public boolean truncate(String src, long newLength, String clientName)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    final List<RemoteLocation> locations = getLocationsForPath(src, true);
-    RemoteMethod method = new RemoteMethod("truncate",
-        new Class<?>[] {String.class, long.class, String.class},
-        new RemoteParam(), newLength, clientName);
-    return ((Boolean) rpcClient.invokeSequential(locations, method,
-        Boolean.class, Boolean.TRUE)).booleanValue();
+    return clientProto.truncate(src, newLength, clientName);
   }
 
   @Override // ClientProtocol
   public boolean delete(String src, boolean recursive) throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    final List<RemoteLocation> locations =
-        getLocationsForPath(src, true, false);
-    RemoteMethod method = new RemoteMethod("delete",
-        new Class<?>[] {String.class, boolean.class}, new RemoteParam(),
-        recursive);
-    if (isPathAll(src)) {
-      return rpcClient.invokeAll(locations, method);
-    } else {
-      return rpcClient.invokeSequential(locations, method,
-          Boolean.class, Boolean.TRUE).booleanValue();
-    }
+    return clientProto.delete(src, recursive);
   }
 
   @Override // ClientProtocol
   public boolean mkdirs(String src, FsPermission masked, boolean createParent)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    final List<RemoteLocation> locations = getLocationsForPath(src, true);
-    RemoteMethod method = new RemoteMethod("mkdirs",
-        new Class<?>[] {String.class, FsPermission.class, boolean.class},
-        new RemoteParam(), masked, createParent);
-
-    // Create in all locations
-    if (isPathAll(src)) {
-      return rpcClient.invokeAll(locations, method);
-    }
-
-    if (locations.size() > 1) {
-      // Check if this directory already exists
-      try {
-        HdfsFileStatus fileStatus = getFileInfo(src);
-        if (fileStatus != null) {
-          // When existing, the NN doesn't return an exception; return true
-          return true;
-        }
-      } catch (IOException ioe) {
-        // Can't query if this file exists or not.
-        LOG.error("Error requesting file info for path {} while proxing 
mkdirs",
-            src, ioe);
-      }
-    }
-
-    RemoteLocation firstLocation = locations.get(0);
-    return ((Boolean) rpcClient.invokeSingle(firstLocation, method))
-        .booleanValue();
+    return clientProto.mkdirs(src, masked, createParent);
   }
 
   @Override // ClientProtocol
   public void renewLease(String clientName) throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    RemoteMethod method = new RemoteMethod("renewLease",
-        new Class<?>[] {String.class}, clientName);
-    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    rpcClient.invokeConcurrent(nss, method, false, false);
+    clientProto.renewLease(clientName);
   }
 
   @Override // ClientProtocol
   public DirectoryListing getListing(String src, byte[] startAfter,
       boolean needLocation) throws IOException {
-    checkOperation(OperationCategory.READ);
-
-    // Locate the dir and fetch the listing
-    final List<RemoteLocation> locations = getLocationsForPath(src, true);
-    RemoteMethod method = new RemoteMethod("getListing",
-        new Class<?>[] {String.class, startAfter.getClass(), boolean.class},
-        new RemoteParam(), startAfter, needLocation);
-    Map<RemoteLocation, DirectoryListing> listings =
-        rpcClient.invokeConcurrent(
-            locations, method, false, false, DirectoryListing.class);
-
-    Map<String, HdfsFileStatus> nnListing = new TreeMap<>();
-    int totalRemainingEntries = 0;
-    int remainingEntries = 0;
-    boolean namenodeListingExists = false;
-    if (listings != null) {
-      // Check the subcluster listing with the smallest name
-      String lastName = null;
-      for (Entry<RemoteLocation, DirectoryListing> entry :
-          listings.entrySet()) {
-        RemoteLocation location = entry.getKey();
-        DirectoryListing listing = entry.getValue();
-        if (listing == null) {
-          LOG.debug("Cannot get listing from {}", location);
-        } else {
-          totalRemainingEntries += listing.getRemainingEntries();
-          HdfsFileStatus[] partialListing = listing.getPartialListing();
-          int length = partialListing.length;
-          if (length > 0) {
-            HdfsFileStatus lastLocalEntry = partialListing[length-1];
-            String lastLocalName = lastLocalEntry.getLocalName();
-            if (lastName == null || lastName.compareTo(lastLocalName) > 0) {
-              lastName = lastLocalName;
-            }
-          }
-        }
-      }
-
-      // Add existing entries
-      for (Object value : listings.values()) {
-        DirectoryListing listing = (DirectoryListing) value;
-        if (listing != null) {
-          namenodeListingExists = true;
-          for (HdfsFileStatus file : listing.getPartialListing()) {
-            String filename = file.getLocalName();
-            if (totalRemainingEntries > 0 && filename.compareTo(lastName) > 0) 
{
-              // Discarding entries further than the lastName
-              remainingEntries++;
-            } else {
-              nnListing.put(filename, file);
-            }
-          }
-          remainingEntries += listing.getRemainingEntries();
-        }
-      }
-    }
-
-    // Add mount points at this level in the tree
-    final List<String> children = subclusterResolver.getMountPoints(src);
-    if (children != null) {
-      // Get the dates for each mount point
-      Map<String, Long> dates = getMountPointDates(src);
-
-      // Create virtual folder with the mount name
-      for (String child : children) {
-        long date = 0;
-        if (dates != null && dates.containsKey(child)) {
-          date = dates.get(child);
-        }
-        // TODO add number of children
-        HdfsFileStatus dirStatus = getMountPointStatus(child, 0, date);
-
-        // This may overwrite existing listing entries with the mount point
-        // TODO don't add if already there?
-        nnListing.put(child, dirStatus);
-      }
-    }
-
-    if (!namenodeListingExists && nnListing.size() == 0) {
-      // NN returns a null object if the directory cannot be found and has no
-      // listing. If we didn't retrieve any NN listing data, and there are no
-      // mount points here, return null.
-      return null;
-    }
-
-    // Generate combined listing
-    HdfsFileStatus[] combinedData = new HdfsFileStatus[nnListing.size()];
-    combinedData = nnListing.values().toArray(combinedData);
-    return new DirectoryListing(combinedData, remainingEntries);
+    return clientProto.getListing(src, startAfter, needLocation);
   }
 
   @Override // ClientProtocol
   public HdfsFileStatus getFileInfo(String src) throws IOException {
-    checkOperation(OperationCategory.READ);
-
-    final List<RemoteLocation> locations = getLocationsForPath(src, false);
-    RemoteMethod method = new RemoteMethod("getFileInfo",
-        new Class<?>[] {String.class}, new RemoteParam());
-
-    HdfsFileStatus ret = null;
-    // If it's a directory, we check in all locations
-    if (isPathAll(src)) {
-      ret = getFileInfoAll(locations, method);
-    } else {
-      // Check for file information sequentially
-      ret = (HdfsFileStatus) rpcClient.invokeSequential(
-          locations, method, HdfsFileStatus.class, null);
-    }
-
-    // If there is no real path, check mount points
-    if (ret == null) {
-      List<String> children = subclusterResolver.getMountPoints(src);
-      if (children != null && !children.isEmpty()) {
-        Map<String, Long> dates = getMountPointDates(src);
-        long date = 0;
-        if (dates != null && dates.containsKey(src)) {
-          date = dates.get(src);
-        }
-        ret = getMountPointStatus(src, children.size(), date);
-      }
-    }
-
-    return ret;
-  }
-
-  /**
-   * Get the file info from all the locations.
-   *
-   * @param locations Locations to check.
-   * @param method The file information method to run.
-   * @return The first file info if it's a file, the directory if it's
-   *         everywhere.
-   * @throws IOException If all the locations throw an exception.
-   */
-  private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
-      final RemoteMethod method) throws IOException {
-
-    // Get the file info from everybody
-    Map<RemoteLocation, HdfsFileStatus> results =
-        rpcClient.invokeConcurrent(locations, method, HdfsFileStatus.class);
-
-    // We return the first file
-    HdfsFileStatus dirStatus = null;
-    for (RemoteLocation loc : locations) {
-      HdfsFileStatus fileStatus = results.get(loc);
-      if (fileStatus != null) {
-        if (!fileStatus.isDirectory()) {
-          return fileStatus;
-        } else if (dirStatus == null) {
-          dirStatus = fileStatus;
-        }
-      }
-    }
-    return dirStatus;
+    return clientProto.getFileInfo(src);
   }
 
   @Override // ClientProtocol
   public boolean isFileClosed(String src) throws IOException {
-    checkOperation(OperationCategory.READ);
-
-    final List<RemoteLocation> locations = getLocationsForPath(src, false);
-    RemoteMethod method = new RemoteMethod("isFileClosed",
-        new Class<?>[] {String.class}, new RemoteParam());
-    return ((Boolean) rpcClient.invokeSequential(
-        locations, method, Boolean.class, Boolean.TRUE)).booleanValue();
+    return clientProto.isFileClosed(src);
   }
 
   @Override // ClientProtocol
   public HdfsFileStatus getFileLinkInfo(String src) throws IOException {
-    checkOperation(OperationCategory.READ);
-
-    final List<RemoteLocation> locations = getLocationsForPath(src, false);
-    RemoteMethod method = new RemoteMethod("getFileLinkInfo",
-        new Class<?>[] {String.class}, new RemoteParam());
-    return (HdfsFileStatus) rpcClient.invokeSequential(
-        locations, method, HdfsFileStatus.class, null);
+    return clientProto.getFileLinkInfo(src);
   }
 
-  @Override
+  @Override // ClientProtocol
   public HdfsLocatedFileStatus getLocatedFileInfo(String src,
       boolean needBlockToken) throws IOException {
-    checkOperation(OperationCategory.READ);
-    final List<RemoteLocation> locations = getLocationsForPath(src, false);
-    RemoteMethod method = new RemoteMethod("getLocatedFileInfo",
-        new Class<?>[] {String.class, boolean.class}, new RemoteParam(),
-        Boolean.valueOf(needBlockToken));
-    return (HdfsLocatedFileStatus) rpcClient.invokeSequential(
-        locations, method, HdfsFileStatus.class, null);
+    return clientProto.getLocatedFileInfo(src, needBlockToken);
   }
 
   @Override // ClientProtocol
   public long[] getStats() throws IOException {
-    checkOperation(OperationCategory.UNCHECKED);
-
-    RemoteMethod method = new RemoteMethod("getStats");
-    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    Map<FederationNamespaceInfo, long[]> results =
-        rpcClient.invokeConcurrent(nss, method, true, false, long[].class);
-    long[] combinedData = new long[STATS_ARRAY_LENGTH];
-    for (long[] data : results.values()) {
-      for (int i = 0; i < combinedData.length && i < data.length; i++) {
-        if (data[i] >= 0) {
-          combinedData[i] += data[i];
-        }
-      }
-    }
-    return combinedData;
+    return clientProto.getStats();
   }
 
   @Override // ClientProtocol
   public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
       throws IOException {
-    checkOperation(OperationCategory.UNCHECKED);
-    return getDatanodeReport(type, true, 0);
+    return clientProto.getDatanodeReport(type);
   }
 
   /**
@@ -1305,29 +778,7 @@ public class RouterRpcServer extends AbstractService
   @Override // ClientProtocol
   public DatanodeStorageReport[] getDatanodeStorageReport(
       DatanodeReportType type) throws IOException {
-    checkOperation(OperationCategory.UNCHECKED);
-
-    Map<String, DatanodeStorageReport[]> dnSubcluster =
-        getDatanodeStorageReportMap(type);
-
-    // Avoid repeating machines in multiple subclusters
-    Map<String, DatanodeStorageReport> datanodesMap = new LinkedHashMap<>();
-    for (DatanodeStorageReport[] dns : dnSubcluster.values()) {
-      for (DatanodeStorageReport dn : dns) {
-        DatanodeInfo dnInfo = dn.getDatanodeInfo();
-        String nodeId = dnInfo.getXferAddr();
-        if (!datanodesMap.containsKey(nodeId)) {
-          datanodesMap.put(nodeId, dn);
-        }
-        // TODO merge somehow, right now it just takes the first one
-      }
-    }
-
-    Collection<DatanodeStorageReport> datanodes = datanodesMap.values();
-    DatanodeStorageReport[] combinedData =
-        new DatanodeStorageReport[datanodes.size()];
-    combinedData = datanodes.toArray(combinedData);
-    return combinedData;
+    return clientProto.getDatanodeStorageReport(type);
   }
 
   /**
@@ -1360,740 +811,388 @@ public class RouterRpcServer extends AbstractService
   @Override // ClientProtocol
   public boolean setSafeMode(SafeModeAction action, boolean isChecked)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    // Set safe mode in all the name spaces
-    RemoteMethod method = new RemoteMethod("setSafeMode",
-        new Class<?>[] {SafeModeAction.class, boolean.class},
-        action, isChecked);
-    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    Map<FederationNamespaceInfo, Boolean> results =
-        rpcClient.invokeConcurrent(
-            nss, method, true, !isChecked, Boolean.class);
-
-    // We only report true if all the name space are in safe mode
-    int numSafemode = 0;
-    for (boolean safemode : results.values()) {
-      if (safemode) {
-        numSafemode++;
-      }
-    }
-    return numSafemode == results.size();
+    return clientProto.setSafeMode(action, isChecked);
   }
 
   @Override // ClientProtocol
   public boolean restoreFailedStorage(String arg) throws IOException {
-    checkOperation(OperationCategory.UNCHECKED);
-
-    RemoteMethod method = new RemoteMethod("restoreFailedStorage",
-        new Class<?>[] {String.class}, arg);
-    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    Map<FederationNamespaceInfo, Boolean> ret =
-        rpcClient.invokeConcurrent(nss, method, true, false, Boolean.class);
-
-    boolean success = true;
-    for (boolean s : ret.values()) {
-      if (!s) {
-        success = false;
-        break;
-      }
-    }
-    return success;
+    return clientProto.restoreFailedStorage(arg);
   }
 
   @Override // ClientProtocol
   public boolean saveNamespace(long timeWindow, long txGap) throws IOException 
{
-    checkOperation(OperationCategory.UNCHECKED);
-
-    RemoteMethod method = new RemoteMethod("saveNamespace",
-        new Class<?>[] {Long.class, Long.class}, timeWindow, txGap);
-    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    Map<FederationNamespaceInfo, Boolean> ret =
-        rpcClient.invokeConcurrent(nss, method, true, false, boolean.class);
-
-    boolean success = true;
-    for (boolean s : ret.values()) {
-      if (!s) {
-        success = false;
-        break;
-      }
-    }
-    return success;
+    return clientProto.saveNamespace(timeWindow, txGap);
   }
 
   @Override // ClientProtocol
   public long rollEdits() throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    RemoteMethod method = new RemoteMethod("rollEdits", new Class<?>[] {});
-    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    Map<FederationNamespaceInfo, Long> ret =
-        rpcClient.invokeConcurrent(nss, method, true, false, long.class);
-
-    // Return the maximum txid
-    long txid = 0;
-    for (long t : ret.values()) {
-      if (t > txid) {
-        txid = t;
-      }
-    }
-    return txid;
+    return clientProto.rollEdits();
   }
 
   @Override // ClientProtocol
   public void refreshNodes() throws IOException {
-    checkOperation(OperationCategory.UNCHECKED);
-
-    RemoteMethod method = new RemoteMethod("refreshNodes", new Class<?>[] {});
-    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    rpcClient.invokeConcurrent(nss, method, true, true);
+    clientProto.refreshNodes();
   }
 
   @Override // ClientProtocol
   public void finalizeUpgrade() throws IOException {
-    checkOperation(OperationCategory.UNCHECKED);
-
-    RemoteMethod method = new RemoteMethod("finalizeUpgrade",
-        new Class<?>[] {});
-    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    rpcClient.invokeConcurrent(nss, method, true, false);
+    clientProto.finalizeUpgrade();
   }
 
   @Override // ClientProtocol
   public boolean upgradeStatus() throws IOException {
-    String methodName = getMethodName();
-    throw new UnsupportedOperationException(
-        "Operation \"" + methodName + "\" is not supported");
+    return clientProto.upgradeStatus();
   }
 
   @Override // ClientProtocol
   public RollingUpgradeInfo rollingUpgrade(RollingUpgradeAction action)
       throws IOException {
-    checkOperation(OperationCategory.READ);
-
-    RemoteMethod method = new RemoteMethod("rollingUpgrade",
-        new Class<?>[] {RollingUpgradeAction.class}, action);
-    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    Map<FederationNamespaceInfo, RollingUpgradeInfo> ret =
-        rpcClient.invokeConcurrent(
-            nss, method, true, false, RollingUpgradeInfo.class);
-
-    // Return the first rolling upgrade info
-    RollingUpgradeInfo info = null;
-    for (RollingUpgradeInfo infoNs : ret.values()) {
-      if (info == null && infoNs != null) {
-        info = infoNs;
-      }
-    }
-    return info;
+    return clientProto.rollingUpgrade(action);
   }
 
   @Override // ClientProtocol
   public void metaSave(String filename) throws IOException {
-    checkOperation(OperationCategory.UNCHECKED);
-
-    RemoteMethod method = new RemoteMethod("metaSave",
-        new Class<?>[] {String.class}, filename);
-    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    rpcClient.invokeConcurrent(nss, method, true, false);
+    clientProto.metaSave(filename);
   }
 
   @Override // ClientProtocol
   public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
       throws IOException {
-    checkOperation(OperationCategory.READ);
-
-    final List<RemoteLocation> locations = getLocationsForPath(path, false);
-    RemoteMethod method = new RemoteMethod("listCorruptFileBlocks",
-        new Class<?>[] {String.class, String.class},
-        new RemoteParam(), cookie);
-    return (CorruptFileBlocks) rpcClient.invokeSequential(
-        locations, method, CorruptFileBlocks.class, null);
+    return clientProto.listCorruptFileBlocks(path, cookie);
   }
 
   @Override // ClientProtocol
   public void setBalancerBandwidth(long bandwidth) throws IOException {
-    checkOperation(OperationCategory.UNCHECKED);
-
-    RemoteMethod method = new RemoteMethod("setBalancerBandwidth",
-        new Class<?>[] {Long.class}, bandwidth);
-    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    rpcClient.invokeConcurrent(nss, method, true, false);
+    clientProto.setBalancerBandwidth(bandwidth);
   }
 
   @Override // ClientProtocol
   public ContentSummary getContentSummary(String path) throws IOException {
-    checkOperation(OperationCategory.READ);
-
-    // Get the summaries from regular files
-    Collection<ContentSummary> summaries = new LinkedList<>();
-    FileNotFoundException notFoundException = null;
-    try {
-      final List<RemoteLocation> locations = getLocationsForPath(path, false);
-      RemoteMethod method = new RemoteMethod("getContentSummary",
-          new Class<?>[] {String.class}, new RemoteParam());
-      Map<RemoteLocation, ContentSummary> results =
-          rpcClient.invokeConcurrent(
-              locations, method, false, false, ContentSummary.class);
-      summaries.addAll(results.values());
-    } catch (FileNotFoundException e) {
-      notFoundException = e;
-    }
-
-    // Add mount points at this level in the tree
-    final List<String> children = subclusterResolver.getMountPoints(path);
-    if (children != null) {
-      for (String child : children) {
-        Path childPath = new Path(path, child);
-        try {
-          ContentSummary mountSummary = 
getContentSummary(childPath.toString());
-          if (mountSummary != null) {
-            summaries.add(mountSummary);
-          }
-        } catch (Exception e) {
-          LOG.error("Cannot get content summary for mount {}: {}",
-              childPath, e.getMessage());
-        }
-      }
-    }
-
-    // Throw original exception if no original nor mount points
-    if (summaries.isEmpty() && notFoundException != null) {
-      throw notFoundException;
-    }
-
-    return aggregateContentSummary(summaries);
-  }
-
-  /**
-   * Aggregate content summaries for each subcluster.
-   *
-   * @param summaries Collection of individual summaries.
-   * @return Aggregated content summary.
-   */
-  private ContentSummary aggregateContentSummary(
-      Collection<ContentSummary> summaries) {
-    if (summaries.size() == 1) {
-      return summaries.iterator().next();
-    }
-
-    long length = 0;
-    long fileCount = 0;
-    long directoryCount = 0;
-    long quota = 0;
-    long spaceConsumed = 0;
-    long spaceQuota = 0;
-
-    for (ContentSummary summary : summaries) {
-      length += summary.getLength();
-      fileCount += summary.getFileCount();
-      directoryCount += summary.getDirectoryCount();
-      quota += summary.getQuota();
-      spaceConsumed += summary.getSpaceConsumed();
-      spaceQuota += summary.getSpaceQuota();
-    }
-
-    ContentSummary ret = new ContentSummary.Builder()
-        .length(length)
-        .fileCount(fileCount)
-        .directoryCount(directoryCount)
-        .quota(quota)
-        .spaceConsumed(spaceConsumed)
-        .spaceQuota(spaceQuota)
-        .build();
-    return ret;
+    return clientProto.getContentSummary(path);
   }
 
   @Override // ClientProtocol
   public void fsync(String src, long fileId, String clientName,
       long lastBlockLength) throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    final List<RemoteLocation> locations = getLocationsForPath(src, true);
-    RemoteMethod method = new RemoteMethod("fsync",
-        new Class<?>[] {String.class, long.class, String.class, long.class },
-        new RemoteParam(), fileId, clientName, lastBlockLength);
-    rpcClient.invokeSequential(locations, method);
+    clientProto.fsync(src, fileId, clientName, lastBlockLength);
   }
 
   @Override // ClientProtocol
   public void setTimes(String src, long mtime, long atime) throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    final List<RemoteLocation> locations = getLocationsForPath(src, true);
-    RemoteMethod method = new RemoteMethod("setTimes",
-        new Class<?>[] {String.class, long.class, long.class},
-        new RemoteParam(), mtime, atime);
-    rpcClient.invokeSequential(locations, method);
+    clientProto.setTimes(src, mtime, atime);
   }
 
   @Override // ClientProtocol
   public void createSymlink(String target, String link, FsPermission dirPerms,
       boolean createParent) throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    // TODO Verify that the link location is in the same NS as the targets
-    final List<RemoteLocation> targetLocations =
-        getLocationsForPath(target, true);
-    final List<RemoteLocation> linkLocations =
-        getLocationsForPath(link, true);
-    RemoteLocation linkLocation = linkLocations.get(0);
-    RemoteMethod method = new RemoteMethod("createSymlink",
-        new Class<?>[] {String.class, String.class, FsPermission.class,
-                        boolean.class},
-        new RemoteParam(), linkLocation.getDest(), dirPerms, createParent);
-    rpcClient.invokeSequential(targetLocations, method);
+    clientProto.createSymlink(target, link, dirPerms, createParent);
   }
 
   @Override // ClientProtocol
   public String getLinkTarget(String path) throws IOException {
-    checkOperation(OperationCategory.READ);
-
-    final List<RemoteLocation> locations = getLocationsForPath(path, true);
-    RemoteMethod method = new RemoteMethod("getLinkTarget",
-        new Class<?>[] {String.class}, new RemoteParam());
-    return (String) rpcClient.invokeSequential(
-        locations, method, String.class, null);
+    return clientProto.getLinkTarget(path);
   }
 
   @Override // Client Protocol
   public void allowSnapshot(String snapshotRoot) throws IOException {
-    checkOperation(OperationCategory.WRITE, false);
+    clientProto.allowSnapshot(snapshotRoot);
   }
 
   @Override // Client Protocol
   public void disallowSnapshot(String snapshot) throws IOException {
-    checkOperation(OperationCategory.WRITE, false);
+    clientProto.disallowSnapshot(snapshot);
   }
 
   @Override // ClientProtocol
   public void renameSnapshot(String snapshotRoot, String snapshotOldName,
       String snapshotNewName) throws IOException {
-    checkOperation(OperationCategory.WRITE, false);
+    clientProto.renameSnapshot(snapshotRoot, snapshotOldName, snapshotNewName);
   }
 
   @Override // Client Protocol
   public SnapshottableDirectoryStatus[] getSnapshottableDirListing()
       throws IOException {
-    checkOperation(OperationCategory.READ, false);
-    return null;
+    return clientProto.getSnapshottableDirListing();
   }
 
   @Override // ClientProtocol
   public SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot,
       String earlierSnapshotName, String laterSnapshotName) throws IOException 
{
-    checkOperation(OperationCategory.READ, false);
-    return null;
+    return clientProto.getSnapshotDiffReport(
+        snapshotRoot, earlierSnapshotName, laterSnapshotName);
   }
 
   @Override // ClientProtocol
   public SnapshotDiffReportListing getSnapshotDiffReportListing(
       String snapshotRoot, String earlierSnapshotName, String 
laterSnapshotName,
       byte[] startPath, int index) throws IOException {
-    checkOperation(OperationCategory.READ, false);
-    return null;
+    return clientProto.getSnapshotDiffReportListing(snapshotRoot,
+        earlierSnapshotName, laterSnapshotName, startPath, index);
   }
 
   @Override // ClientProtocol
   public long addCacheDirective(CacheDirectiveInfo path,
       EnumSet<CacheFlag> flags) throws IOException {
-    checkOperation(OperationCategory.WRITE, false);
-    return 0;
+    return clientProto.addCacheDirective(path, flags);
   }
 
   @Override // ClientProtocol
   public void modifyCacheDirective(CacheDirectiveInfo directive,
       EnumSet<CacheFlag> flags) throws IOException {
-    checkOperation(OperationCategory.WRITE, false);
+    clientProto.modifyCacheDirective(directive, flags);
   }
 
   @Override // ClientProtocol
   public void removeCacheDirective(long id) throws IOException {
-    checkOperation(OperationCategory.WRITE, false);
+    clientProto.removeCacheDirective(id);
   }
 
   @Override // ClientProtocol
   public BatchedEntries<CacheDirectiveEntry> listCacheDirectives(
       long prevId, CacheDirectiveInfo filter) throws IOException {
-    checkOperation(OperationCategory.READ, false);
-    return null;
+    return clientProto.listCacheDirectives(prevId, filter);
   }
 
   @Override // ClientProtocol
   public void addCachePool(CachePoolInfo info) throws IOException {
-    checkOperation(OperationCategory.WRITE, false);
+    clientProto.addCachePool(info);
   }
 
   @Override // ClientProtocol
   public void modifyCachePool(CachePoolInfo info) throws IOException {
-    checkOperation(OperationCategory.WRITE, false);
+    clientProto.modifyCachePool(info);
   }
 
   @Override // ClientProtocol
   public void removeCachePool(String cachePoolName) throws IOException {
-    checkOperation(OperationCategory.WRITE, false);
+    clientProto.removeCachePool(cachePoolName);
   }
 
   @Override // ClientProtocol
   public BatchedEntries<CachePoolEntry> listCachePools(String prevKey)
       throws IOException {
-    checkOperation(OperationCategory.READ, false);
-    return null;
+    return clientProto.listCachePools(prevKey);
   }
 
   @Override // ClientProtocol
   public void modifyAclEntries(String src, List<AclEntry> aclSpec)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    // TODO handle virtual directories
-    final List<RemoteLocation> locations = getLocationsForPath(src, true);
-    RemoteMethod method = new RemoteMethod("modifyAclEntries",
-        new Class<?>[] {String.class, List.class},
-        new RemoteParam(), aclSpec);
-    rpcClient.invokeSequential(locations, method, null, null);
+    clientProto.modifyAclEntries(src, aclSpec);
   }
 
   @Override // ClienProtocol
   public void removeAclEntries(String src, List<AclEntry> aclSpec)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    // TODO handle virtual directories
-    final List<RemoteLocation> locations = getLocationsForPath(src, true);
-    RemoteMethod method = new RemoteMethod("removeAclEntries",
-        new Class<?>[] {String.class, List.class},
-        new RemoteParam(), aclSpec);
-    rpcClient.invokeSequential(locations, method, null, null);
+    clientProto.removeAclEntries(src, aclSpec);
   }
 
   @Override // ClientProtocol
   public void removeDefaultAcl(String src) throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    // TODO handle virtual directories
-    final List<RemoteLocation> locations = getLocationsForPath(src, true);
-    RemoteMethod method = new RemoteMethod("removeDefaultAcl",
-        new Class<?>[] {String.class}, new RemoteParam());
-    rpcClient.invokeSequential(locations, method);
+    clientProto.removeDefaultAcl(src);
   }
 
   @Override // ClientProtocol
   public void removeAcl(String src) throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    // TODO handle virtual directories
-    final List<RemoteLocation> locations = getLocationsForPath(src, true);
-    RemoteMethod method = new RemoteMethod("removeAcl",
-        new Class<?>[] {String.class}, new RemoteParam());
-    rpcClient.invokeSequential(locations, method);
+    clientProto.removeAcl(src);
   }
 
   @Override // ClientProtocol
   public void setAcl(String src, List<AclEntry> aclSpec) throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    // TODO handle virtual directories
-    final List<RemoteLocation> locations = getLocationsForPath(src, true);
-    RemoteMethod method = new RemoteMethod(
-        "setAcl", new Class<?>[] {String.class, List.class},
-        new RemoteParam(), aclSpec);
-    rpcClient.invokeSequential(locations, method);
+    clientProto.setAcl(src, aclSpec);
   }
 
   @Override // ClientProtocol
   public AclStatus getAclStatus(String src) throws IOException {
-    checkOperation(OperationCategory.READ);
-
-    // TODO handle virtual directories
-    final List<RemoteLocation> locations = getLocationsForPath(src, false);
-    RemoteMethod method = new RemoteMethod("getAclStatus",
-        new Class<?>[] {String.class}, new RemoteParam());
-    return (AclStatus) rpcClient.invokeSequential(
-        locations, method, AclStatus.class, null);
+    return clientProto.getAclStatus(src);
   }
 
   @Override // ClientProtocol
   public void createEncryptionZone(String src, String keyName)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    // TODO handle virtual directories
-    final List<RemoteLocation> locations = getLocationsForPath(src, true);
-    RemoteMethod method = new RemoteMethod("createEncryptionZone",
-        new Class<?>[] {String.class, String.class},
-        new RemoteParam(), keyName);
-    rpcClient.invokeSequential(locations, method);
+    clientProto.createEncryptionZone(src, keyName);
   }
 
   @Override // ClientProtocol
   public EncryptionZone getEZForPath(String src) throws IOException {
-    checkOperation(OperationCategory.READ);
-
-    // TODO handle virtual directories
-    final List<RemoteLocation> locations = getLocationsForPath(src, false);
-    RemoteMethod method = new RemoteMethod("getEZForPath",
-        new Class<?>[] {String.class}, new RemoteParam());
-    return (EncryptionZone) rpcClient.invokeSequential(
-        locations, method, EncryptionZone.class, null);
+    return clientProto.getEZForPath(src);
   }
 
   @Override // ClientProtocol
   public BatchedEntries<EncryptionZone> listEncryptionZones(long prevId)
       throws IOException {
-    checkOperation(OperationCategory.READ, false);
-    return null;
+    return clientProto.listEncryptionZones(prevId);
   }
 
   @Override // ClientProtocol
   public void reencryptEncryptionZone(String zone, ReencryptAction action)
       throws IOException {
-    checkOperation(OperationCategory.WRITE, false);
+    clientProto.reencryptEncryptionZone(zone, action);
   }
 
   @Override // ClientProtocol
   public BatchedEntries<ZoneReencryptionStatus> listReencryptionStatus(
       long prevId) throws IOException {
-    checkOperation(OperationCategory.READ, false);
-    return null;
+    return clientProto.listReencryptionStatus(prevId);
   }
 
   @Override // ClientProtocol
   public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    // TODO handle virtual directories
-    final List<RemoteLocation> locations = getLocationsForPath(src, true);
-    RemoteMethod method = new RemoteMethod("setXAttr",
-        new Class<?>[] {String.class, XAttr.class, EnumSet.class},
-        new RemoteParam(), xAttr, flag);
-    rpcClient.invokeSequential(locations, method);
+    clientProto.setXAttr(src, xAttr, flag);
   }
 
-  @SuppressWarnings("unchecked")
   @Override // ClientProtocol
   public List<XAttr> getXAttrs(String src, List<XAttr> xAttrs)
       throws IOException {
-    checkOperation(OperationCategory.READ);
-
-    // TODO handle virtual directories
-    final List<RemoteLocation> locations = getLocationsForPath(src, false);
-    RemoteMethod method = new RemoteMethod("getXAttrs",
-        new Class<?>[] {String.class, List.class}, new RemoteParam(), xAttrs);
-    return (List<XAttr>) rpcClient.invokeSequential(
-        locations, method, List.class, null);
+    return clientProto.getXAttrs(src, xAttrs);
   }
 
-  @SuppressWarnings("unchecked")
   @Override // ClientProtocol
   public List<XAttr> listXAttrs(String src) throws IOException {
-    checkOperation(OperationCategory.READ);
-
-    // TODO handle virtual directories
-    final List<RemoteLocation> locations = getLocationsForPath(src, false);
-    RemoteMethod method = new RemoteMethod("listXAttrs",
-        new Class<?>[] {String.class}, new RemoteParam());
-    return (List<XAttr>) rpcClient.invokeSequential(
-        locations, method, List.class, null);
+    return clientProto.listXAttrs(src);
   }
 
   @Override // ClientProtocol
   public void removeXAttr(String src, XAttr xAttr) throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    // TODO handle virtual directories
-    final List<RemoteLocation> locations = getLocationsForPath(src, true);
-    RemoteMethod method = new RemoteMethod("removeXAttr",
-        new Class<?>[] {String.class, XAttr.class}, new RemoteParam(), xAttr);
-    rpcClient.invokeSequential(locations, method);
+    clientProto.removeXAttr(src, xAttr);
   }
 
   @Override // ClientProtocol
   public void checkAccess(String path, FsAction mode) throws IOException {
-    checkOperation(OperationCategory.READ);
-
-    // TODO handle virtual directories
-    final List<RemoteLocation> locations = getLocationsForPath(path, true);
-    RemoteMethod method = new RemoteMethod("checkAccess",
-        new Class<?>[] {String.class, FsAction.class},
-        new RemoteParam(), mode);
-    rpcClient.invokeSequential(locations, method);
+    clientProto.checkAccess(path, mode);
   }
 
   @Override // ClientProtocol
   public long getCurrentEditLogTxid() throws IOException {
-    checkOperation(OperationCategory.READ);
-
-    RemoteMethod method = new RemoteMethod(
-        "getCurrentEditLogTxid", new Class<?>[] {});
-    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    Map<FederationNamespaceInfo, Long> ret =
-        rpcClient.invokeConcurrent(nss, method, true, false, long.class);
-
-    // Return the maximum txid
-    long txid = 0;
-    for (long t : ret.values()) {
-      if (t > txid) {
-        txid = t;
-      }
-    }
-    return txid;
+    return clientProto.getCurrentEditLogTxid();
   }
 
   @Override // ClientProtocol
   public EventBatchList getEditsFromTxid(long txid) throws IOException {
-    checkOperation(OperationCategory.READ, false);
-    return null;
+    return clientProto.getEditsFromTxid(txid);
   }
 
-  @Override
+  @Override // ClientProtocol
   public DataEncryptionKey getDataEncryptionKey() throws IOException {
-    checkOperation(OperationCategory.READ, false);
-    return null;
+    return clientProto.getDataEncryptionKey();
   }
 
-  @Override
+  @Override // ClientProtocol
   public String createSnapshot(String snapshotRoot, String snapshotName)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
-    return null;
+    return clientProto.createSnapshot(snapshotRoot, snapshotName);
   }
 
-  @Override
+  @Override // ClientProtocol
   public void deleteSnapshot(String snapshotRoot, String snapshotName)
       throws IOException {
-    checkOperation(OperationCategory.WRITE, false);
+    clientProto.deleteSnapshot(snapshotRoot, snapshotName);
   }
 
   @Override // ClientProtocol
   public void setQuota(String path, long namespaceQuota, long 
storagespaceQuota,
       StorageType type) throws IOException {
-    this.quotaCall.setQuota(path, namespaceQuota, storagespaceQuota, type);
+    clientProto.setQuota(path, namespaceQuota, storagespaceQuota, type);
   }
 
   @Override // ClientProtocol
   public QuotaUsage getQuotaUsage(String path) throws IOException {
-    return this.quotaCall.getQuotaUsage(path);
+    return clientProto.getQuotaUsage(path);
   }
 
-  @Override
+  @Override // ClientProtocol
   public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    // Block pool id -> blocks
-    Map<String, List<LocatedBlock>> blockLocations = new HashMap<>();
-    for (LocatedBlock block : blocks) {
-      String bpId = block.getBlock().getBlockPoolId();
-      List<LocatedBlock> bpBlocks = blockLocations.get(bpId);
-      if (bpBlocks == null) {
-        bpBlocks = new LinkedList<>();
-        blockLocations.put(bpId, bpBlocks);
-      }
-      bpBlocks.add(block);
-    }
-
-    // Invoke each block pool
-    for (Entry<String, List<LocatedBlock>> entry : blockLocations.entrySet()) {
-      String bpId = entry.getKey();
-      List<LocatedBlock> bpBlocks = entry.getValue();
-
-      LocatedBlock[] bpBlocksArray =
-          bpBlocks.toArray(new LocatedBlock[bpBlocks.size()]);
-      RemoteMethod method = new RemoteMethod("reportBadBlocks",
-          new Class<?>[] {LocatedBlock[].class},
-          new Object[] {bpBlocksArray});
-      rpcClient.invokeSingleBlockPool(bpId, method);
-    }
+    clientProto.reportBadBlocks(blocks);
   }
 
-  @Override
+  @Override // ClientProtocol
   public void unsetStoragePolicy(String src) throws IOException {
-    checkOperation(OperationCategory.WRITE, false);
+    clientProto.unsetStoragePolicy(src);
   }
 
-  @Override
+  @Override // ClientProtocol
   public BlockStoragePolicy getStoragePolicy(String path) throws IOException {
-    checkOperation(OperationCategory.READ, false);
-    return null;
+    return clientProto.getStoragePolicy(path);
   }
 
   @Override // ClientProtocol
   public ErasureCodingPolicyInfo[] getErasureCodingPolicies()
       throws IOException {
-    return erasureCoding.getErasureCodingPolicies();
+    return clientProto.getErasureCodingPolicies();
   }
 
   @Override // ClientProtocol
   public Map<String, String> getErasureCodingCodecs() throws IOException {
-    return erasureCoding.getErasureCodingCodecs();
+    return clientProto.getErasureCodingCodecs();
   }
 
   @Override // ClientProtocol
   public AddErasureCodingPolicyResponse[] addErasureCodingPolicies(
       ErasureCodingPolicy[] policies) throws IOException {
-    return erasureCoding.addErasureCodingPolicies(policies);
+    return clientProto.addErasureCodingPolicies(policies);
   }
 
   @Override // ClientProtocol
   public void removeErasureCodingPolicy(String ecPolicyName)
       throws IOException {
-    erasureCoding.removeErasureCodingPolicy(ecPolicyName);
+    clientProto.removeErasureCodingPolicy(ecPolicyName);
   }
 
   @Override // ClientProtocol
   public void disableErasureCodingPolicy(String ecPolicyName)
       throws IOException {
-    erasureCoding.disableErasureCodingPolicy(ecPolicyName);
+    clientProto.disableErasureCodingPolicy(ecPolicyName);
   }
 
   @Override // ClientProtocol
   public void enableErasureCodingPolicy(String ecPolicyName)
       throws IOException {
-    erasureCoding.enableErasureCodingPolicy(ecPolicyName);
+    clientProto.enableErasureCodingPolicy(ecPolicyName);
   }
 
   @Override // ClientProtocol
   public ErasureCodingPolicy getErasureCodingPolicy(String src)
       throws IOException {
-    return erasureCoding.getErasureCodingPolicy(src);
+    return clientProto.getErasureCodingPolicy(src);
   }
 
   @Override // ClientProtocol
   public void setErasureCodingPolicy(String src, String ecPolicyName)
       throws IOException {
-    erasureCoding.setErasureCodingPolicy(src, ecPolicyName);
+    clientProto.setErasureCodingPolicy(src, ecPolicyName);
   }
 
   @Override // ClientProtocol
   public void unsetErasureCodingPolicy(String src) throws IOException {
-    erasureCoding.unsetErasureCodingPolicy(src);
+    clientProto.unsetErasureCodingPolicy(src);
   }
 
-  @Override
+  @Override // ClientProtocol
   public ECBlockGroupStats getECBlockGroupStats() throws IOException {
-    return erasureCoding.getECBlockGroupStats();
+    return clientProto.getECBlockGroupStats();
   }
 
-  @Override
+  @Override // ClientProtocol
   public ReplicatedBlockStats getReplicatedBlockStats() throws IOException {
-    checkOperation(OperationCategory.READ, false);
-    return null;
+    return clientProto.getReplicatedBlockStats();
   }
 
   @Deprecated
-  @Override
+  @Override // ClientProtocol
   public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId)
       throws IOException {
-    return listOpenFiles(prevId, EnumSet.of(OpenFilesType.ALL_OPEN_FILES),
-        OpenFilesIterator.FILTER_PATH_DEFAULT);
+    return clientProto.listOpenFiles(prevId);
   }
 
-  @Override
+  @Override // ClientProtocol
   public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
       EnumSet<OpenFilesType> openFilesTypes, String path) throws IOException {
-    checkOperation(OperationCategory.READ, false);
-    return null;
+    return clientProto.listOpenFiles(prevId, openFilesTypes, path);
+  }
+
+  @Override // ClientProtocol
+  public void satisfyStoragePolicy(String path) throws IOException {
+    clientProto.satisfyStoragePolicy(path);
   }
 
   @Override // NamenodeProtocol
@@ -2167,6 +1266,11 @@ public class RouterRpcServer extends AbstractService
     return nnProto.isRollingUpgrade();
   }
 
+  @Override // NamenodeProtocol
+  public Long getNextSPSPath() throws IOException {
+    return nnProto.getNextSPSPath();
+  }
+
   /**
    * Locate the location with the matching block pool id.
    *
@@ -2176,7 +1280,7 @@ public class RouterRpcServer extends AbstractService
    * @return Prioritized list of locations in the federated cluster.
    * @throws IOException if the location for this path cannot be determined.
    */
-  private RemoteLocation getLocationForPath(
+  protected RemoteLocation getLocationForPath(
       String path, boolean failIfLocked, String blockPoolId)
           throws IOException {
 
@@ -2276,27 +1380,6 @@ public class RouterRpcServer extends AbstractService
   }
 
   /**
-   * Check if a path should be in all subclusters.
-   *
-   * @param path Path to check.
-   * @return If a path should be in all subclusters.
-   */
-  private boolean isPathAll(final String path) {
-    if (subclusterResolver instanceof MountTableResolver) {
-      try {
-        MountTableResolver mountTable = (MountTableResolver)subclusterResolver;
-        MountTable entry = mountTable.getMountPoint(path);
-        if (entry != null) {
-          return entry.isAll();
-        }
-      } catch (IOException e) {
-        LOG.error("Cannot get mount point", e);
-      }
-    }
-    return false;
-  }
-
-  /**
    * Check if a path is in a read only mount point.
    *
    * @param path Path to check.
@@ -2318,121 +1401,6 @@ public class RouterRpcServer extends AbstractService
   }
 
   /**
-   * Get the modification dates for mount points.
-   *
-   * @param path Name of the path to start checking dates from.
-   * @return Map with the modification dates for all sub-entries.
-   */
-  private Map<String, Long> getMountPointDates(String path) {
-    Map<String, Long> ret = new TreeMap<>();
-    if (subclusterResolver instanceof MountTableResolver) {
-      try {
-        final List<String> children = subclusterResolver.getMountPoints(path);
-        for (String child : children) {
-          Long modTime = getModifiedTime(ret, path, child);
-          ret.put(child, modTime);
-        }
-      } catch (IOException e) {
-        LOG.error("Cannot get mount point", e);
-      }
-    }
-    return ret;
-  }
-
-  /**
-   * Get modified time for child. If the child is present in mount table it
-   * will return the modified time. If the child is not present but subdirs of
-   * this child are present then it will return latest modified subdir's time
-   * as modified time of the requested child.
-   * @param ret contains children and modified times.
-   * @param mountTable.
-   * @param path Name of the path to start checking dates from.
-   * @param child child of the requested path.
-   * @return modified time.
-   */
-  private long getModifiedTime(Map<String, Long> ret, String path,
-      String child) {
-    MountTableResolver mountTable = (MountTableResolver)subclusterResolver;
-    String srcPath;
-    if (path.equals(Path.SEPARATOR)) {
-      srcPath = Path.SEPARATOR + child;
-    } else {
-      srcPath = path + Path.SEPARATOR + child;
-    }
-    Long modTime = 0L;
-    try {
-      // Get mount table entry for the srcPath
-      MountTable entry = mountTable.getMountPoint(srcPath);
-      // if srcPath is not in mount table but its subdirs are in mount
-      // table we will display latest modified subdir date/time.
-      if (entry == null) {
-        List<MountTable> entries = mountTable.getMounts(srcPath);
-        for (MountTable eachEntry : entries) {
-          // Get the latest date
-          if (ret.get(child) == null ||
-              ret.get(child) < eachEntry.getDateModified()) {
-            modTime = eachEntry.getDateModified();
-          }
-        }
-      } else {
-        modTime = entry.getDateModified();
-      }
-    } catch (IOException e) {
-      LOG.error("Cannot get mount point", e);
-    }
-    return modTime;
-  }
-
-  /**
-   * Create a new file status for a mount point.
-   *
-   * @param name Name of the mount point.
-   * @param childrenNum Number of children.
-   * @param date Map with the dates.
-   * @return New HDFS file status representing a mount point.
-   */
-  private HdfsFileStatus getMountPointStatus(
-      String name, int childrenNum, long date) {
-    long modTime = date;
-    long accessTime = date;
-    FsPermission permission = FsPermission.getDirDefault();
-    String owner = this.superUser;
-    String group = this.superGroup;
-    try {
-      // TODO support users, it should be the user for the pointed folder
-      UserGroupInformation ugi = getRemoteUser();
-      owner = ugi.getUserName();
-      group = ugi.getPrimaryGroupName();
-    } catch (IOException e) {
-      LOG.error("Cannot get the remote user: {}", e.getMessage());
-    }
-    long inodeId = 0;
-    return new HdfsFileStatus.Builder()
-      .isdir(true)
-      .mtime(modTime)
-      .atime(accessTime)
-      .perm(permission)
-      .owner(owner)
-      .group(group)
-      .symlink(new byte[0])
-      .path(DFSUtil.string2Bytes(name))
-      .fileId(inodeId)
-      .children(childrenNum)
-      .build();
-  }
-
-  /**
-   * Get the name of the method that is calling this function.
-   *
-   * @return Name of the method calling this function.
-   */
-  private static String getMethodName() {
-    final StackTraceElement[] stack = Thread.currentThread().getStackTrace();
-    String methodName = stack[3].getMethodName();
-    return methodName;
-  }
-
-  /**
    * Get the user that is invoking this operation.
    *
    * @return Remote user group information.
@@ -2490,16 +1458,4 @@ public class RouterRpcServer extends AbstractService
   public FederationRPCMetrics getRPCMetrics() {
     return this.rpcMonitor.getRPCMetrics();
   }
-
-  @Override
-  public void satisfyStoragePolicy(String path) throws IOException {
-    checkOperation(OperationCategory.WRITE, false);
-  }
-
-  @Override
-  public Long getNextSPSPath() throws IOException {
-    checkOperation(OperationCategory.READ, false);
-    // not supported
-    return null;
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to