http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterPermissionChecker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterPermissionChecker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterPermissionChecker.java
deleted file mode 100644
index 9d81dce..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterPermissionChecker.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.router;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
-import org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
-
-/**
- * Class that helps in checking permissions in Router-based federation.
- */
-public class RouterPermissionChecker extends FSPermissionChecker {
-  static final Log LOG = LogFactory.getLog(RouterPermissionChecker.class);
-
-  /** Mount table default permission. */
-  public static final short MOUNT_TABLE_PERMISSION_DEFAULT = 00755;
-
-  public RouterPermissionChecker(String routerOwner, String supergroup,
-      UserGroupInformation callerUgi) {
-    super(routerOwner, supergroup, callerUgi, null);
-  }
-
-  /**
-   * Whether a mount table entry can be accessed by the current context.
-   *
-   * @param mountTable
-   *          MountTable being accessed
-   * @param access
-   *          type of action being performed on the cache pool
-   * @throws AccessControlException
-   *           if mount table cannot be accessed
-   */
-  public void checkPermission(MountTable mountTable, FsAction access)
-      throws AccessControlException {
-    if (isSuperUser()) {
-      return;
-    }
-
-    FsPermission mode = mountTable.getMode();
-    if (getUser().equals(mountTable.getOwnerName())
-        && mode.getUserAction().implies(access)) {
-      return;
-    }
-
-    if (isMemberOfGroup(mountTable.getGroupName())
-        && mode.getGroupAction().implies(access)) {
-      return;
-    }
-
-    if (!getUser().equals(mountTable.getOwnerName())
-        && !isMemberOfGroup(mountTable.getGroupName())
-        && mode.getOtherAction().implies(access)) {
-      return;
-    }
-
-    throw new AccessControlException(
-        "Permission denied while accessing mount table "
-            + mountTable.getSourcePath()
-            + ": user " + getUser() + " does not have " + access.toString()
-            + " permissions.");
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
deleted file mode 100644
index d3b7947..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ /dev/null
@@ -1,1022 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.federation.router;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import 
org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
-import 
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
-import 
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
-import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.ipc.StandbyException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-/**
- * A client proxy for Router -> NN communication using the NN ClientProtocol.
- * <p>
- * Provides routers to invoke remote ClientProtocol methods and handle
- * retries/failover.
- * <ul>
- * <li>invokeSingle Make a single request to a single namespace
- * <li>invokeSequential Make a sequential series of requests to multiple
- * ordered namespaces until a condition is met.
- * <li>invokeConcurrent Make concurrent requests to multiple namespaces and
- * return all of the results.
- * </ul>
- * Also maintains a cached pool of connections to NNs. Connections are managed
- * by the ConnectionManager and are unique to each user + NN. The size of the
- * connection pool can be configured. Larger pools allow for more simultaneous
- * requests to a single NN from a single user.
- */
-public class RouterRpcClient {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(RouterRpcClient.class);
-
-
-  /** Router identifier. */
-  private final String routerId;
-
-  /** Interface to identify the active NN for a nameservice or blockpool ID. */
-  private final ActiveNamenodeResolver namenodeResolver;
-
-  /** Connection pool to the Namenodes per user for performance. */
-  private final ConnectionManager connectionManager;
-  /** Service to run asynchronous calls. */
-  private final ExecutorService executorService;
-  /** Retry policy for router -> NN communication. */
-  private final RetryPolicy retryPolicy;
-  /** Optional perf monitor. */
-  private final RouterRpcMonitor rpcMonitor;
-
-  /** Pattern to parse a stack trace line. */
-  private static final Pattern STACK_TRACE_PATTERN =
-      Pattern.compile("\\tat (.*)\\.(.*)\\((.*):(\\d*)\\)");
-
-
-  /**
-   * Create a router RPC client to manage remote procedure calls to NNs.
-   *
-   * @param conf Hdfs Configuation.
-   * @param resolver A NN resolver to determine the currently active NN in HA.
-   * @param monitor Optional performance monitor.
-   */
-  public RouterRpcClient(Configuration conf, String identifier,
-      ActiveNamenodeResolver resolver, RouterRpcMonitor monitor) {
-    this.routerId = identifier;
-
-    this.namenodeResolver = resolver;
-
-    this.connectionManager = new ConnectionManager(conf);
-    this.connectionManager.start();
-
-    int numThreads = conf.getInt(
-        DFSConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE,
-        DFSConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE_DEFAULT);
-    ThreadFactory threadFactory = new ThreadFactoryBuilder()
-        .setNameFormat("RPC Router Client-%d")
-        .build();
-    this.executorService = Executors.newFixedThreadPool(
-        numThreads, threadFactory);
-
-    this.rpcMonitor = monitor;
-
-    int maxFailoverAttempts = conf.getInt(
-        HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY,
-        HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT);
-    int maxRetryAttempts = conf.getInt(
-        DFSConfigKeys.DFS_ROUTER_CLIENT_MAX_ATTEMPTS,
-        DFSConfigKeys.DFS_ROUTER_CLIENT_MAX_ATTEMPTS_DEFAULT);
-    int failoverSleepBaseMillis = conf.getInt(
-        HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_KEY,
-        HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_DEFAULT);
-    int failoverSleepMaxMillis = conf.getInt(
-        HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_KEY,
-        HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_DEFAULT);
-    this.retryPolicy = RetryPolicies.failoverOnNetworkException(
-        RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts, 
maxRetryAttempts,
-        failoverSleepBaseMillis, failoverSleepMaxMillis);
-  }
-
-  /**
-   * Get the active namenode resolver used by this client.
-   * @return Active namenode resolver.
-   */
-  public ActiveNamenodeResolver getNamenodeResolver() {
-    return this.namenodeResolver;
-  }
-
-  /**
-   * Shutdown the client.
-   */
-  public void shutdown() {
-    if (this.connectionManager != null) {
-      this.connectionManager.close();
-    }
-    if (this.executorService != null) {
-      this.executorService.shutdownNow();
-    }
-  }
-
-  /**
-   * Total number of available sockets between the router and NNs.
-   *
-   * @return Number of namenode clients.
-   */
-  public int getNumConnections() {
-    return this.connectionManager.getNumConnections();
-  }
-
-  /**
-   * Total number of available sockets between the router and NNs.
-   *
-   * @return Number of namenode clients.
-   */
-  public int getNumActiveConnections() {
-    return this.connectionManager.getNumActiveConnections();
-  }
-
-  /**
-   * Total number of open connection pools to a NN. Each connection pool.
-   * represents one user + one NN.
-   *
-   * @return Number of connection pools.
-   */
-  public int getNumConnectionPools() {
-    return this.connectionManager.getNumConnectionPools();
-  }
-
-  /**
-   * Number of connections between the router and NNs being created sockets.
-   *
-   * @return Number of connections waiting to be created.
-   */
-  public int getNumCreatingConnections() {
-    return this.connectionManager.getNumCreatingConnections();
-  }
-
-  /**
-   * JSON representation of the connection pool.
-   *
-   * @return String representation of the JSON.
-   */
-  public String getJSON() {
-    return this.connectionManager.getJSON();
-  }
-
-  /**
-   * Get ClientProtocol proxy client for a NameNode. Each combination of user +
-   * NN must use a unique proxy client. Previously created clients are cached
-   * and stored in a connection pool by the ConnectionManager.
-   *
-   * @param ugi User group information.
-   * @param nsId Nameservice identifier.
-   * @param rpcAddress ClientProtocol RPC server address of the NN.
-   * @return ConnectionContext containing a ClientProtocol proxy client for the
-   *         NN + current user.
-   * @throws IOException If we cannot get a connection to the NameNode.
-   */
-  private ConnectionContext getConnection(
-      UserGroupInformation ugi, String nsId, String rpcAddress)
-          throws IOException {
-    ConnectionContext connection = null;
-    try {
-      // Each proxy holds the UGI info for the current user when it is created.
-      // This cache does not scale very well, one entry per user per namenode,
-      // and may need to be adjusted and/or selectively pruned. The cache is
-      // important due to the excessive overhead of creating a new proxy 
wrapper
-      // for each individual request.
-
-      // TODO Add tokens from the federated UGI
-      connection = this.connectionManager.getConnection(ugi, rpcAddress);
-      LOG.debug("User {} NN {} is using connection {}",
-          ugi.getUserName(), rpcAddress, connection);
-    } catch (Exception ex) {
-      LOG.error("Cannot open NN client to address: {}", rpcAddress, ex);
-    }
-
-    if (connection == null) {
-      throw new IOException("Cannot get a connection to " + rpcAddress);
-    }
-    return connection;
-  }
-
-  /**
-   * Convert an exception to an IOException.
-   *
-   * For a non-IOException, wrap it with IOException. For a RemoteException,
-   * unwrap it. For an IOException which is not a RemoteException, return it.
-   *
-   * @param e Exception to convert into an exception.
-   * @return Created IO exception.
-   */
-  private static IOException toIOException(Exception e) {
-    if (e instanceof RemoteException) {
-      return ((RemoteException) e).unwrapRemoteException();
-    }
-    if (e instanceof IOException) {
-      return (IOException)e;
-    }
-    return new IOException(e);
-  }
-
-  /**
-   * If we should retry the RPC call.
-   *
-   * @param ioe IOException reported.
-   * @param retryCount Number of retries.
-   * @param nsId Nameservice ID.
-   * @return Retry decision.
-   * @throws IOException Original exception if the retry policy generates one
-   *                     or IOException for no available namenodes.
-   */
-  private RetryDecision shouldRetry(final IOException ioe, final int 
retryCount,
-      final String nsId) throws IOException {
-    // check for the case of cluster unavailable state
-    if (isClusterUnAvailable(nsId)) {
-      // we allow to retry once if cluster is unavailable
-      if (retryCount == 0) {
-        return RetryDecision.RETRY;
-      } else {
-        throw new IOException("No namenode available under nameservice " + 
nsId,
-            ioe);
-      }
-    }
-
-    try {
-      final RetryPolicy.RetryAction a =
-          this.retryPolicy.shouldRetry(ioe, retryCount, 0, true);
-      return a.action;
-    } catch (Exception ex) {
-      LOG.error("Re-throwing API exception, no more retries", ex);
-      throw toIOException(ex);
-    }
-  }
-
-  /**
-   * Invokes a method against the ClientProtocol proxy server. If a standby
-   * exception is generated by the call to the client, retries using the
-   * alternate server.
-   *
-   * Re-throws exceptions generated by the remote RPC call as either
-   * RemoteException or IOException.
-   *
-   * @param ugi User group information.
-   * @param namenodes A prioritized list of namenodes within the same
-   *                  nameservice.
-   * @param method Remote ClientProtcol method to invoke.
-   * @param params Variable list of parameters matching the method.
-   * @return The result of invoking the method.
-   * @throws IOException
-   */
-  private Object invokeMethod(
-      final UserGroupInformation ugi,
-      final List<? extends FederationNamenodeContext> namenodes,
-      final Method method, final Object... params) throws IOException {
-
-    if (namenodes == null || namenodes.isEmpty()) {
-      throw new IOException("No namenodes to invoke " + method.getName() +
-          " with params " + Arrays.toString(params) + " from " + 
this.routerId);
-    }
-
-    Object ret = null;
-    if (rpcMonitor != null) {
-      rpcMonitor.proxyOp();
-    }
-    boolean failover = false;
-    Map<FederationNamenodeContext, IOException> ioes = new LinkedHashMap<>();
-    for (FederationNamenodeContext namenode : namenodes) {
-      ConnectionContext connection = null;
-      try {
-        String nsId = namenode.getNameserviceId();
-        String rpcAddress = namenode.getRpcAddress();
-        connection = this.getConnection(ugi, nsId, rpcAddress);
-        ProxyAndInfo<ClientProtocol> client = connection.getClient();
-        ClientProtocol proxy = client.getProxy();
-        ret = invoke(nsId, 0, method, proxy, params);
-        if (failover) {
-          // Success on alternate server, update
-          InetSocketAddress address = client.getAddress();
-          namenodeResolver.updateActiveNamenode(nsId, address);
-        }
-        if (this.rpcMonitor != null) {
-          this.rpcMonitor.proxyOpComplete(true);
-        }
-        return ret;
-      } catch (IOException ioe) {
-        ioes.put(namenode, ioe);
-        if (ioe instanceof StandbyException) {
-          // Fail over indicated by retry policy and/or NN
-          if (this.rpcMonitor != null) {
-            this.rpcMonitor.proxyOpFailureStandby();
-          }
-          failover = true;
-        } else if (ioe instanceof RemoteException) {
-          if (this.rpcMonitor != null) {
-            this.rpcMonitor.proxyOpComplete(true);
-          }
-          // RemoteException returned by NN
-          throw (RemoteException) ioe;
-        } else {
-          // Other communication error, this is a failure
-          // Communication retries are handled by the retry policy
-          if (this.rpcMonitor != null) {
-            this.rpcMonitor.proxyOpFailureCommunicate();
-            this.rpcMonitor.proxyOpComplete(false);
-          }
-          throw ioe;
-        }
-      } finally {
-        if (connection != null) {
-          connection.release();
-        }
-      }
-    }
-    if (this.rpcMonitor != null) {
-      this.rpcMonitor.proxyOpComplete(false);
-    }
-
-    // All namenodes were unavailable or in standby
-    String msg = "No namenode available to invoke " + method.getName() + " " +
-        Arrays.toString(params);
-    LOG.error(msg);
-    for (Entry<FederationNamenodeContext, IOException> entry :
-        ioes.entrySet()) {
-      FederationNamenodeContext namenode = entry.getKey();
-      String nsId = namenode.getNameserviceId();
-      String nnId = namenode.getNamenodeId();
-      String addr = namenode.getRpcAddress();
-      IOException ioe = entry.getValue();
-      if (ioe instanceof StandbyException) {
-        LOG.error("{} {} at {} is in Standby", nsId, nnId, addr);
-      } else {
-        LOG.error("{} {} at {} error: \"{}\"",
-            nsId, nnId, addr, ioe.getMessage());
-      }
-    }
-    throw new StandbyException(msg);
-  }
-
-  /**
-   * Invokes a method on the designated object. Catches exceptions specific to
-   * the invocation.
-   *
-   * Re-throws exceptions generated by the remote RPC call as either
-   * RemoteException or IOException.
-   *
-   * @param nsId Identifier for the namespace
-   * @param retryCount Current retry times
-   * @param method Method to invoke
-   * @param obj Target object for the method
-   * @param params Variable parameters
-   * @return Response from the remote server
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  private Object invoke(String nsId, int retryCount, final Method method,
-      final Object obj, final Object... params) throws IOException {
-    try {
-      return method.invoke(obj, params);
-    } catch (IllegalAccessException e) {
-      LOG.error("Unexpected exception while proxying API", e);
-      return null;
-    } catch (IllegalArgumentException e) {
-      LOG.error("Unexpected exception while proxying API", e);
-      return null;
-    } catch (InvocationTargetException e) {
-      Throwable cause = e.getCause();
-      if (cause instanceof IOException) {
-        IOException ioe = (IOException) cause;
-
-        // Check if we should retry.
-        RetryDecision decision = shouldRetry(ioe, retryCount, nsId);
-        if (decision == RetryDecision.RETRY) {
-          if (this.rpcMonitor != null) {
-            this.rpcMonitor.proxyOpRetries();
-          }
-
-          // retry
-          return invoke(nsId, ++retryCount, method, obj, params);
-        } else if (decision == RetryDecision.FAILOVER_AND_RETRY) {
-          // failover, invoker looks for standby exceptions for failover.
-          if (ioe instanceof StandbyException) {
-            throw ioe;
-          } else {
-            throw new StandbyException(ioe.getMessage());
-          }
-        } else {
-          if (ioe instanceof RemoteException) {
-            RemoteException re = (RemoteException) ioe;
-            ioe = re.unwrapRemoteException();
-            ioe = getCleanException(ioe);
-          }
-          throw ioe;
-        }
-      } else {
-        throw new IOException(e);
-      }
-    }
-  }
-
-  /**
-   * Check if the cluster of given nameservice id is available.
-   * @param nsId nameservice ID.
-   * @return
-   * @throws IOException
-   */
-  private boolean isClusterUnAvailable(String nsId) throws IOException {
-    List<? extends FederationNamenodeContext> nnState = this.namenodeResolver
-        .getNamenodesForNameserviceId(nsId);
-
-    if (nnState != null) {
-      for (FederationNamenodeContext nnContext : nnState) {
-        // Once we find one NN is in active state, we assume this
-        // cluster is available.
-        if (nnContext.getState() == FederationNamenodeServiceState.ACTIVE) {
-          return false;
-        }
-      }
-    }
-
-    return true;
-  }
-
-  /**
-   * Get a clean copy of the exception. Sometimes the exceptions returned by 
the
-   * server contain the full stack trace in the message.
-   *
-   * @param ioe Exception to clean up.
-   * @return Copy of the original exception with a clean message.
-   */
-  private static IOException getCleanException(IOException ioe) {
-    IOException ret = null;
-
-    String msg = ioe.getMessage();
-    Throwable cause = ioe.getCause();
-    StackTraceElement[] stackTrace = ioe.getStackTrace();
-
-    // Clean the message by removing the stack trace
-    int index = msg.indexOf("\n");
-    if (index > 0) {
-      String[] msgSplit = msg.split("\n");
-      msg = msgSplit[0];
-
-      // Parse stack trace from the message
-      List<StackTraceElement> elements = new LinkedList<>();
-      for (int i=1; i<msgSplit.length; i++) {
-        String line = msgSplit[i];
-        Matcher matcher = STACK_TRACE_PATTERN.matcher(line);
-        if (matcher.find()) {
-          String declaringClass = matcher.group(1);
-          String methodName = matcher.group(2);
-          String fileName = matcher.group(3);
-          int lineNumber = Integer.parseInt(matcher.group(4));
-          StackTraceElement element = new StackTraceElement(
-              declaringClass, methodName, fileName, lineNumber);
-          elements.add(element);
-        }
-      }
-      stackTrace = elements.toArray(new StackTraceElement[elements.size()]);
-    }
-
-    // Create the new output exception
-    if (ioe instanceof RemoteException) {
-      RemoteException re = (RemoteException)ioe;
-      ret = new RemoteException(re.getClassName(), msg);
-    } else {
-      // Try the simple constructor and initialize the fields
-      Class<? extends IOException> ioeClass = ioe.getClass();
-      try {
-        Constructor<? extends IOException> constructor =
-            ioeClass.getDeclaredConstructor(String.class);
-        ret = constructor.newInstance(msg);
-      } catch (ReflectiveOperationException e) {
-        // If there are errors, just use the input one
-        LOG.error("Could not create exception {}", ioeClass.getSimpleName(), 
e);
-        ret = ioe;
-      }
-    }
-    if (ret != null) {
-      ret.initCause(cause);
-      ret.setStackTrace(stackTrace);
-    }
-
-    return ret;
-  }
-
-  /**
-   * Invokes a ClientProtocol method. Determines the target nameservice via a
-   * provided block.
-   *
-   * Re-throws exceptions generated by the remote RPC call as either
-   * RemoteException or IOException.
-   *
-   * @param block Block used to determine appropriate nameservice.
-   * @param method The remote method and parameters to invoke.
-   * @return The result of invoking the method.
-   * @throws IOException
-   */
-  public Object invokeSingle(final ExtendedBlock block, RemoteMethod method)
-      throws IOException {
-    String bpId = block.getBlockPoolId();
-    return invokeSingleBlockPool(bpId, method);
-  }
-
-  /**
-   * Invokes a ClientProtocol method. Determines the target nameservice using
-   * the block pool id.
-   *
-   * Re-throws exceptions generated by the remote RPC call as either
-   * RemoteException or IOException.
-   *
-   * @param bpId Block pool identifier.
-   * @param method The remote method and parameters to invoke.
-   * @return The result of invoking the method.
-   * @throws IOException
-   */
-  public Object invokeSingleBlockPool(final String bpId, RemoteMethod method)
-      throws IOException {
-    String nsId = getNameserviceForBlockPoolId(bpId);
-    return invokeSingle(nsId, method);
-  }
-
-  /**
-   * Invokes a ClientProtocol method against the specified namespace.
-   *
-   * Re-throws exceptions generated by the remote RPC call as either
-   * RemoteException or IOException.
-   *
-   * @param nsId Target namespace for the method.
-   * @param method The remote method and parameters to invoke.
-   * @return The result of invoking the method.
-   * @throws IOException
-   */
-  public Object invokeSingle(final String nsId, RemoteMethod method)
-      throws IOException {
-    UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
-    List<? extends FederationNamenodeContext> nns =
-        getNamenodesForNameservice(nsId);
-    RemoteLocationContext loc = new RemoteLocation(nsId, "/");
-    return invokeMethod(ugi, nns, method.getMethod(), method.getParams(loc));
-  }
-
-  /**
-   * Invokes a single proxy call for a single location.
-   *
-   * Re-throws exceptions generated by the remote RPC call as either
-   * RemoteException or IOException.
-   *
-   * @param location RemoteLocation to invoke.
-   * @param remoteMethod The remote method and parameters to invoke.
-   * @return The result of invoking the method if successful.
-   * @throws IOException
-   */
-  public Object invokeSingle(final RemoteLocationContext location,
-      RemoteMethod remoteMethod) throws IOException {
-    List<RemoteLocationContext> locations = 
Collections.singletonList(location);
-    return invokeSequential(locations, remoteMethod);
-  }
-
-  /**
-   * Invokes sequential proxy calls to different locations. Continues to invoke
-   * calls until a call returns without throwing a remote exception.
-   *
-   * @param locations List of locations/nameservices to call concurrently.
-   * @param remoteMethod The remote method and parameters to invoke.
-   * @return The result of the first successful call, or if no calls are
-   *         successful, the result of the last RPC call executed.
-   * @throws IOException if the success condition is not met and one of the RPC
-   *           calls generated a remote exception.
-   */
-  public Object invokeSequential(
-      final List<? extends RemoteLocationContext> locations,
-      final RemoteMethod remoteMethod) throws IOException {
-    return invokeSequential(locations, remoteMethod, null, null);
-  }
-
-  /**
-   * Invokes sequential proxy calls to different locations. Continues to invoke
-   * calls until the success condition is met, or until all locations have been
-   * attempted.
-   *
-   * The success condition may be specified by:
-   * <ul>
-   * <li>An expected result class
-   * <li>An expected result value
-   * </ul>
-   *
-   * If no expected result class/values are specified, the success condition is
-   * a call that does not throw a remote exception.
-   *
-   * @param locations List of locations/nameservices to call concurrently.
-   * @param remoteMethod The remote method and parameters to invoke.
-   * @param expectedResultClass In order to be considered a positive result, 
the
-   *          return type must be of this class.
-   * @param expectedResultValue In order to be considered a positive result, 
the
-   *          return value must equal the value of this object.
-   * @return The result of the first successful call, or if no calls are
-   *         successful, the result of the first RPC call executed.
-   * @throws IOException if the success condition is not met, return the first
-   *                     remote exception generated.
-   */
-  public <T> T invokeSequential(
-      final List<? extends RemoteLocationContext> locations,
-      final RemoteMethod remoteMethod, Class<T> expectedResultClass,
-      Object expectedResultValue) throws IOException {
-
-    final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
-    final Method m = remoteMethod.getMethod();
-    IOException firstThrownException = null;
-    IOException lastThrownException = null;
-    Object firstResult = null;
-    // Invoke in priority order
-    for (final RemoteLocationContext loc : locations) {
-      String ns = loc.getNameserviceId();
-      List<? extends FederationNamenodeContext> namenodes =
-          getNamenodesForNameservice(ns);
-      try {
-        Object[] params = remoteMethod.getParams(loc);
-        Object result = invokeMethod(ugi, namenodes, m, params);
-        // Check if the result is what we expected
-        if (isExpectedClass(expectedResultClass, result) &&
-            isExpectedValue(expectedResultValue, result)) {
-          // Valid result, stop here
-          @SuppressWarnings("unchecked")
-          T ret = (T)result;
-          return ret;
-        }
-        if (firstResult == null) {
-          firstResult = result;
-        }
-      } catch (IOException ioe) {
-        // Record it and move on
-        lastThrownException = (IOException) ioe;
-        if (firstThrownException == null) {
-          firstThrownException = lastThrownException;
-        }
-      } catch (Exception e) {
-        // Unusual error, ClientProtocol calls always use IOException (or
-        // RemoteException). Re-wrap in IOException for compatibility with
-        // ClientProtcol.
-        LOG.error("Unexpected exception {} proxying {} to {}",
-            e.getClass(), m.getName(), ns, e);
-        lastThrownException = new IOException(
-            "Unexpected exception proxying API " + e.getMessage(), e);
-        if (firstThrownException == null) {
-          firstThrownException = lastThrownException;
-        }
-      }
-    }
-
-    if (firstThrownException != null) {
-      // re-throw the last exception thrown for compatibility
-      throw firstThrownException;
-    }
-    // Return the last result, whether it is the value we are looking for or a
-    @SuppressWarnings("unchecked")
-    T ret = (T)firstResult;
-    return ret;
-  }
-
-  /**
-   * Checks if a result matches the required result class.
-   *
-   * @param expectedClass Required result class, null to skip the check.
-   * @param clazz The result to check.
-   * @return True if the result is an instance of the required class or if the
-   *         expected class is null.
-   */
-  private static boolean isExpectedClass(Class<?> expectedClass, Object clazz) 
{
-    if (expectedClass == null) {
-      return true;
-    } else if (clazz == null) {
-      return false;
-    } else {
-      return expectedClass.isInstance(clazz);
-    }
-  }
-
-  /**
-   * Checks if a result matches the expected value.
-   *
-   * @param expectedValue The expected value, null to skip the check.
-   * @param value The result to check.
-   * @return True if the result is equals to the expected value or if the
-   *         expected value is null.
-   */
-  private static boolean isExpectedValue(Object expectedValue, Object value) {
-    if (expectedValue == null) {
-      return true;
-    } else if (value == null) {
-      return false;
-    } else {
-      return value.equals(expectedValue);
-    }
-  }
-
-  /**
-   * Invoke multiple concurrent proxy calls to different clients. Returns an
-   * array of results.
-   *
-   * Re-throws exceptions generated by the remote RPC call as either
-   * RemoteException or IOException.
-   *
-   * @param <T> The type of the remote location.
-   * @param locations List of remote locations to call concurrently.
-   * @param method The remote method and parameters to invoke.
-   * @param requireResponse If true an exception will be thrown if all calls do
-   *          not complete. If false exceptions are ignored and all data 
results
-   *          successfully received are returned.
-   * @param standby If the requests should go to the standby namenodes too.
-   * @throws IOException If all the calls throw an exception.
-   */
-  public <T extends RemoteLocationContext, R> void invokeConcurrent(
-      final Collection<T> locations, final RemoteMethod method,
-      boolean requireResponse, boolean standby) throws IOException {
-    invokeConcurrent(locations, method, requireResponse, standby, void.class);
-  }
-
-  /**
-   * Invokes multiple concurrent proxy calls to different clients. Returns an
-   * array of results.
-   *
-   * Re-throws exceptions generated by the remote RPC call as either
-   * RemoteException or IOException.
-   *
-   * @param <T> The type of the remote location.
-   * @param <R> The type of the remote method return.
-   * @param locations List of remote locations to call concurrently.
-   * @param method The remote method and parameters to invoke.
-   * @param requireResponse If true an exception will be thrown if all calls do
-   *          not complete. If false exceptions are ignored and all data 
results
-   *          successfully received are returned.
-   * @param standby If the requests should go to the standby namenodes too.
-   * @param clazz Type of the remote return type.
-   * @return Result of invoking the method per subcluster: nsId -> result.
-   * @throws IOException If requiredResponse=true and any of the calls throw an
-   *           exception.
-   */
-  public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
-      final Collection<T> locations, final RemoteMethod method,
-      boolean requireResponse, boolean standby, Class<R> clazz)
-          throws IOException {
-    return invokeConcurrent(
-        locations, method, requireResponse, standby, -1, clazz);
-  }
-
-  /**
-   * Invokes multiple concurrent proxy calls to different clients. Returns an
-   * array of results.
-   *
-   * Re-throws exceptions generated by the remote RPC call as either
-   * RemoteException or IOException.
-   *
-   * @param <T> The type of the remote location.
-   * @param <R> The type of the remote method return.
-   * @param locations List of remote locations to call concurrently.
-   * @param method The remote method and parameters to invoke.
-   * @param requireResponse If true an exception will be thrown if all calls do
-   *          not complete. If false exceptions are ignored and all data 
results
-   *          successfully received are returned.
-   * @param standby If the requests should go to the standby namenodes too.
-   * @param timeOutMs Timeout for each individual call.
-   * @param clazz Type of the remote return type.
-   * @return Result of invoking the method per subcluster: nsId -> result.
-   * @throws IOException If requiredResponse=true and any of the calls throw an
-   *           exception.
-   */
-  @SuppressWarnings("unchecked")
-  public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
-      final Collection<T> locations, final RemoteMethod method,
-      boolean requireResponse, boolean standby, long timeOutMs, Class<R> clazz)
-          throws IOException {
-
-    final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
-    final Method m = method.getMethod();
-
-    if (locations.size() == 1) {
-      // Shortcut, just one call
-      T location = locations.iterator().next();
-      String ns = location.getNameserviceId();
-      final List<? extends FederationNamenodeContext> namenodes =
-          getNamenodesForNameservice(ns);
-      Object[] paramList = method.getParams(location);
-      Object result = invokeMethod(ugi, namenodes, m, paramList);
-      return Collections.singletonMap(location, clazz.cast(result));
-    }
-
-    List<T> orderedLocations = new LinkedList<>();
-    Set<Callable<Object>> callables = new HashSet<>();
-    for (final T location : locations) {
-      String nsId = location.getNameserviceId();
-      final List<? extends FederationNamenodeContext> namenodes =
-          getNamenodesForNameservice(nsId);
-      final Object[] paramList = method.getParams(location);
-      if (standby) {
-        // Call the objectGetter to all NNs (including standby)
-        for (final FederationNamenodeContext nn : namenodes) {
-          String nnId = nn.getNamenodeId();
-          final List<FederationNamenodeContext> nnList =
-              Collections.singletonList(nn);
-          T nnLocation = location;
-          if (location instanceof RemoteLocation) {
-            nnLocation = (T)new RemoteLocation(nsId, nnId, location.getDest());
-          }
-          orderedLocations.add(nnLocation);
-          callables.add(new Callable<Object>() {
-            public Object call() throws Exception {
-              return invokeMethod(ugi, nnList, m, paramList);
-            }
-          });
-        }
-      } else {
-        // Call the objectGetter in order of nameservices in the NS list
-        orderedLocations.add(location);
-        callables.add(new Callable<Object>() {
-          public Object call() throws Exception {
-            return invokeMethod(ugi, namenodes, m, paramList);
-          }
-        });
-      }
-    }
-
-    if (rpcMonitor != null) {
-      rpcMonitor.proxyOp();
-    }
-
-    try {
-      List<Future<Object>> futures = null;
-      if (timeOutMs > 0) {
-        futures = executorService.invokeAll(
-            callables, timeOutMs, TimeUnit.MILLISECONDS);
-      } else {
-        futures = executorService.invokeAll(callables);
-      }
-      Map<T, R> results = new TreeMap<>();
-      Map<T, IOException> exceptions = new TreeMap<>();
-      for (int i=0; i<futures.size(); i++) {
-        T location = orderedLocations.get(i);
-        try {
-          Future<Object> future = futures.get(i);
-          Object result = future.get();
-          results.put(location, clazz.cast(result));
-        } catch (CancellationException ce) {
-          T loc = orderedLocations.get(i);
-          String msg =
-              "Invocation to \"" + loc + "\" for \"" + method + "\" timed out";
-          LOG.error(msg);
-          IOException ioe = new IOException(msg);
-          exceptions.put(location, ioe);
-        } catch (ExecutionException ex) {
-          Throwable cause = ex.getCause();
-          LOG.debug("Canot execute {} in {}: {}",
-              m.getName(), location, cause.getMessage());
-
-          // Convert into IOException if needed
-          IOException ioe = null;
-          if (cause instanceof IOException) {
-            ioe = (IOException) cause;
-          } else {
-            ioe = new IOException("Unhandled exception while proxying API " +
-                m.getName() + ": " + cause.getMessage(), cause);
-          }
-
-          // Response from all servers required, use this error.
-          if (requireResponse) {
-            throw ioe;
-          }
-
-          // Store the exceptions
-          exceptions.put(location, ioe);
-        }
-      }
-
-      // Throw the exception for the first location if there are no results
-      if (results.isEmpty()) {
-        T location = orderedLocations.get(0);
-        IOException ioe = exceptions.get(location);
-        if (ioe != null) {
-          throw ioe;
-        }
-      }
-
-      return results;
-    } catch (InterruptedException ex) {
-      LOG.error("Unexpected error while invoking API: {}", ex.getMessage());
-      throw new IOException(
-          "Unexpected error while invoking API " + ex.getMessage(), ex);
-    }
-  }
-
-  /**
-   * Get a prioritized list of NNs that share the same nameservice ID (in the
-   * same namespace). NNs that are reported as ACTIVE will be first in the 
list.
-   *
-   * @param nsId The nameservice ID for the namespace.
-   * @return A prioritized list of NNs to use for communication.
-   * @throws IOException If a NN cannot be located for the nameservice ID.
-   */
-  private List<? extends FederationNamenodeContext> getNamenodesForNameservice(
-      final String nsId) throws IOException {
-
-    final List<? extends FederationNamenodeContext> namenodes =
-        namenodeResolver.getNamenodesForNameserviceId(nsId);
-
-    if (namenodes == null || namenodes.isEmpty()) {
-      throw new IOException("Cannot locate a registered namenode for " + nsId +
-          " from " + this.routerId);
-    }
-    return namenodes;
-  }
-
-  /**
-   * Get a prioritized list of NNs that share the same block pool ID (in the
-   * same namespace). NNs that are reported as ACTIVE will be first in the 
list.
-   *
-   * @param bpId The blockpool ID for the namespace.
-   * @return A prioritized list of NNs to use for communication.
-   * @throws IOException If a NN cannot be located for the block pool ID.
-   */
-  private List<? extends FederationNamenodeContext> getNamenodesForBlockPoolId(
-      final String bpId) throws IOException {
-
-    List<? extends FederationNamenodeContext> namenodes =
-        namenodeResolver.getNamenodesForBlockPoolId(bpId);
-
-    if (namenodes == null || namenodes.isEmpty()) {
-      throw new IOException("Cannot locate a registered namenode for " + bpId +
-          " from " + this.routerId);
-    }
-    return namenodes;
-  }
-
-  /**
-   * Get the nameservice identifier for a block pool.
-   *
-   * @param bpId Identifier of the block pool.
-   * @return Nameservice identifier.
-   * @throws IOException If a NN cannot be located for the block pool ID.
-   */
-  private String getNameserviceForBlockPoolId(final String bpId)
-      throws IOException {
-    List<? extends FederationNamenodeContext> namenodes =
-        getNamenodesForBlockPoolId(bpId);
-    FederationNamenodeContext namenode = namenodes.get(0);
-    return namenode.getNameserviceId();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
deleted file mode 100644
index df9aa11..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.router;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
-import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
-
-/**
- * Metrics and monitoring interface for the router RPC server. Allows pluggable
- * diagnostics and monitoring services to be attached.
- */
-public interface RouterRpcMonitor {
-
-  /**
-   * Initialize the monitor.
-   * @param conf Configuration for the monitor.
-   * @param server RPC server.
-   * @param store State Store.
-   */
-  void init(
-      Configuration conf, RouterRpcServer server, StateStoreService store);
-
-  /**
-   * Get Router RPC metrics info.
-   * @return The instance of FederationRPCMetrics.
-   */
-  FederationRPCMetrics getRPCMetrics();
-
-  /**
-   * Close the monitor.
-   */
-  void close();
-
-  /**
-   * Start processing an operation on the Router.
-   */
-  void startOp();
-
-  /**
-   * Start proxying an operation to the Namenode.
-   * @return Id of the thread doing the proxying.
-   */
-  long proxyOp();
-
-  /**
-   * Mark a proxy operation as completed.
-   * @param success If the operation was successful.
-   */
-  void proxyOpComplete(boolean success);
-
-  /**
-   * Failed to proxy an operation to a Namenode because it was in standby.
-   */
-  void proxyOpFailureStandby();
-
-  /**
-   * Failed to proxy an operation to a Namenode because of an unexpected
-   * exception.
-   */
-  void proxyOpFailureCommunicate();
-
-  /**
-   * Failed to proxy an operation because it is not implemented.
-   */
-  void proxyOpNotImplemented();
-
-  /**
-   * Retry to proxy an operation to a Namenode because of an unexpected
-   * exception.
-   */
-  void proxyOpRetries();
-
-  /**
-   * If the Router cannot contact the State Store in an operation.
-   */
-  void routerFailureStateStore();
-
-  /**
-   * If the Router is in safe mode.
-   */
-  void routerFailureSafemode();
-
-  /**
-   * If a path is locked.
-   */
-  void routerFailureLocked();
-
-  /**
-   * If a path is in a read only mount point.
-   */
-  void routerFailureReadOnly();
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to