http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterPermissionChecker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterPermissionChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterPermissionChecker.java deleted file mode 100644 index 9d81dce..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterPermissionChecker.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; -import org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker; -import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.UserGroupInformation; - -/** - * Class that helps in checking permissions in Router-based federation. - */ -public class RouterPermissionChecker extends FSPermissionChecker { - static final Log LOG = LogFactory.getLog(RouterPermissionChecker.class); - - /** Mount table default permission. */ - public static final short MOUNT_TABLE_PERMISSION_DEFAULT = 00755; - - public RouterPermissionChecker(String routerOwner, String supergroup, - UserGroupInformation callerUgi) { - super(routerOwner, supergroup, callerUgi, null); - } - - /** - * Whether a mount table entry can be accessed by the current context. - * - * @param mountTable - * MountTable being accessed - * @param access - * type of action being performed on the cache pool - * @throws AccessControlException - * if mount table cannot be accessed - */ - public void checkPermission(MountTable mountTable, FsAction access) - throws AccessControlException { - if (isSuperUser()) { - return; - } - - FsPermission mode = mountTable.getMode(); - if (getUser().equals(mountTable.getOwnerName()) - && mode.getUserAction().implies(access)) { - return; - } - - if (isMemberOfGroup(mountTable.getGroupName()) - && mode.getGroupAction().implies(access)) { - return; - } - - if (!getUser().equals(mountTable.getOwnerName()) - && !isMemberOfGroup(mountTable.getGroupName()) - && mode.getOtherAction().implies(access)) { - return; - } - - throw new AccessControlException( - "Permission denied while accessing mount table " - + mountTable.getSourcePath() - + ": user " + getUser() + " does not have " + access.toString() - + " permissions."); - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java deleted file mode 100644 index d3b7947..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ /dev/null @@ -1,1022 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.server.federation.router; - -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.net.InetSocketAddress; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; -import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; -import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; -import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.ipc.StandbyException; -import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -/** - * A client proxy for Router -> NN communication using the NN ClientProtocol. - * <p> - * Provides routers to invoke remote ClientProtocol methods and handle - * retries/failover. - * <ul> - * <li>invokeSingle Make a single request to a single namespace - * <li>invokeSequential Make a sequential series of requests to multiple - * ordered namespaces until a condition is met. - * <li>invokeConcurrent Make concurrent requests to multiple namespaces and - * return all of the results. - * </ul> - * Also maintains a cached pool of connections to NNs. Connections are managed - * by the ConnectionManager and are unique to each user + NN. The size of the - * connection pool can be configured. Larger pools allow for more simultaneous - * requests to a single NN from a single user. - */ -public class RouterRpcClient { - - private static final Logger LOG = - LoggerFactory.getLogger(RouterRpcClient.class); - - - /** Router identifier. */ - private final String routerId; - - /** Interface to identify the active NN for a nameservice or blockpool ID. */ - private final ActiveNamenodeResolver namenodeResolver; - - /** Connection pool to the Namenodes per user for performance. */ - private final ConnectionManager connectionManager; - /** Service to run asynchronous calls. */ - private final ExecutorService executorService; - /** Retry policy for router -> NN communication. */ - private final RetryPolicy retryPolicy; - /** Optional perf monitor. */ - private final RouterRpcMonitor rpcMonitor; - - /** Pattern to parse a stack trace line. */ - private static final Pattern STACK_TRACE_PATTERN = - Pattern.compile("\\tat (.*)\\.(.*)\\((.*):(\\d*)\\)"); - - - /** - * Create a router RPC client to manage remote procedure calls to NNs. - * - * @param conf Hdfs Configuation. - * @param resolver A NN resolver to determine the currently active NN in HA. - * @param monitor Optional performance monitor. - */ - public RouterRpcClient(Configuration conf, String identifier, - ActiveNamenodeResolver resolver, RouterRpcMonitor monitor) { - this.routerId = identifier; - - this.namenodeResolver = resolver; - - this.connectionManager = new ConnectionManager(conf); - this.connectionManager.start(); - - int numThreads = conf.getInt( - DFSConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, - DFSConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE_DEFAULT); - ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setNameFormat("RPC Router Client-%d") - .build(); - this.executorService = Executors.newFixedThreadPool( - numThreads, threadFactory); - - this.rpcMonitor = monitor; - - int maxFailoverAttempts = conf.getInt( - HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY, - HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT); - int maxRetryAttempts = conf.getInt( - DFSConfigKeys.DFS_ROUTER_CLIENT_MAX_ATTEMPTS, - DFSConfigKeys.DFS_ROUTER_CLIENT_MAX_ATTEMPTS_DEFAULT); - int failoverSleepBaseMillis = conf.getInt( - HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_KEY, - HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_DEFAULT); - int failoverSleepMaxMillis = conf.getInt( - HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_KEY, - HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_DEFAULT); - this.retryPolicy = RetryPolicies.failoverOnNetworkException( - RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts, maxRetryAttempts, - failoverSleepBaseMillis, failoverSleepMaxMillis); - } - - /** - * Get the active namenode resolver used by this client. - * @return Active namenode resolver. - */ - public ActiveNamenodeResolver getNamenodeResolver() { - return this.namenodeResolver; - } - - /** - * Shutdown the client. - */ - public void shutdown() { - if (this.connectionManager != null) { - this.connectionManager.close(); - } - if (this.executorService != null) { - this.executorService.shutdownNow(); - } - } - - /** - * Total number of available sockets between the router and NNs. - * - * @return Number of namenode clients. - */ - public int getNumConnections() { - return this.connectionManager.getNumConnections(); - } - - /** - * Total number of available sockets between the router and NNs. - * - * @return Number of namenode clients. - */ - public int getNumActiveConnections() { - return this.connectionManager.getNumActiveConnections(); - } - - /** - * Total number of open connection pools to a NN. Each connection pool. - * represents one user + one NN. - * - * @return Number of connection pools. - */ - public int getNumConnectionPools() { - return this.connectionManager.getNumConnectionPools(); - } - - /** - * Number of connections between the router and NNs being created sockets. - * - * @return Number of connections waiting to be created. - */ - public int getNumCreatingConnections() { - return this.connectionManager.getNumCreatingConnections(); - } - - /** - * JSON representation of the connection pool. - * - * @return String representation of the JSON. - */ - public String getJSON() { - return this.connectionManager.getJSON(); - } - - /** - * Get ClientProtocol proxy client for a NameNode. Each combination of user + - * NN must use a unique proxy client. Previously created clients are cached - * and stored in a connection pool by the ConnectionManager. - * - * @param ugi User group information. - * @param nsId Nameservice identifier. - * @param rpcAddress ClientProtocol RPC server address of the NN. - * @return ConnectionContext containing a ClientProtocol proxy client for the - * NN + current user. - * @throws IOException If we cannot get a connection to the NameNode. - */ - private ConnectionContext getConnection( - UserGroupInformation ugi, String nsId, String rpcAddress) - throws IOException { - ConnectionContext connection = null; - try { - // Each proxy holds the UGI info for the current user when it is created. - // This cache does not scale very well, one entry per user per namenode, - // and may need to be adjusted and/or selectively pruned. The cache is - // important due to the excessive overhead of creating a new proxy wrapper - // for each individual request. - - // TODO Add tokens from the federated UGI - connection = this.connectionManager.getConnection(ugi, rpcAddress); - LOG.debug("User {} NN {} is using connection {}", - ugi.getUserName(), rpcAddress, connection); - } catch (Exception ex) { - LOG.error("Cannot open NN client to address: {}", rpcAddress, ex); - } - - if (connection == null) { - throw new IOException("Cannot get a connection to " + rpcAddress); - } - return connection; - } - - /** - * Convert an exception to an IOException. - * - * For a non-IOException, wrap it with IOException. For a RemoteException, - * unwrap it. For an IOException which is not a RemoteException, return it. - * - * @param e Exception to convert into an exception. - * @return Created IO exception. - */ - private static IOException toIOException(Exception e) { - if (e instanceof RemoteException) { - return ((RemoteException) e).unwrapRemoteException(); - } - if (e instanceof IOException) { - return (IOException)e; - } - return new IOException(e); - } - - /** - * If we should retry the RPC call. - * - * @param ioe IOException reported. - * @param retryCount Number of retries. - * @param nsId Nameservice ID. - * @return Retry decision. - * @throws IOException Original exception if the retry policy generates one - * or IOException for no available namenodes. - */ - private RetryDecision shouldRetry(final IOException ioe, final int retryCount, - final String nsId) throws IOException { - // check for the case of cluster unavailable state - if (isClusterUnAvailable(nsId)) { - // we allow to retry once if cluster is unavailable - if (retryCount == 0) { - return RetryDecision.RETRY; - } else { - throw new IOException("No namenode available under nameservice " + nsId, - ioe); - } - } - - try { - final RetryPolicy.RetryAction a = - this.retryPolicy.shouldRetry(ioe, retryCount, 0, true); - return a.action; - } catch (Exception ex) { - LOG.error("Re-throwing API exception, no more retries", ex); - throw toIOException(ex); - } - } - - /** - * Invokes a method against the ClientProtocol proxy server. If a standby - * exception is generated by the call to the client, retries using the - * alternate server. - * - * Re-throws exceptions generated by the remote RPC call as either - * RemoteException or IOException. - * - * @param ugi User group information. - * @param namenodes A prioritized list of namenodes within the same - * nameservice. - * @param method Remote ClientProtcol method to invoke. - * @param params Variable list of parameters matching the method. - * @return The result of invoking the method. - * @throws IOException - */ - private Object invokeMethod( - final UserGroupInformation ugi, - final List<? extends FederationNamenodeContext> namenodes, - final Method method, final Object... params) throws IOException { - - if (namenodes == null || namenodes.isEmpty()) { - throw new IOException("No namenodes to invoke " + method.getName() + - " with params " + Arrays.toString(params) + " from " + this.routerId); - } - - Object ret = null; - if (rpcMonitor != null) { - rpcMonitor.proxyOp(); - } - boolean failover = false; - Map<FederationNamenodeContext, IOException> ioes = new LinkedHashMap<>(); - for (FederationNamenodeContext namenode : namenodes) { - ConnectionContext connection = null; - try { - String nsId = namenode.getNameserviceId(); - String rpcAddress = namenode.getRpcAddress(); - connection = this.getConnection(ugi, nsId, rpcAddress); - ProxyAndInfo<ClientProtocol> client = connection.getClient(); - ClientProtocol proxy = client.getProxy(); - ret = invoke(nsId, 0, method, proxy, params); - if (failover) { - // Success on alternate server, update - InetSocketAddress address = client.getAddress(); - namenodeResolver.updateActiveNamenode(nsId, address); - } - if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpComplete(true); - } - return ret; - } catch (IOException ioe) { - ioes.put(namenode, ioe); - if (ioe instanceof StandbyException) { - // Fail over indicated by retry policy and/or NN - if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpFailureStandby(); - } - failover = true; - } else if (ioe instanceof RemoteException) { - if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpComplete(true); - } - // RemoteException returned by NN - throw (RemoteException) ioe; - } else { - // Other communication error, this is a failure - // Communication retries are handled by the retry policy - if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpFailureCommunicate(); - this.rpcMonitor.proxyOpComplete(false); - } - throw ioe; - } - } finally { - if (connection != null) { - connection.release(); - } - } - } - if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpComplete(false); - } - - // All namenodes were unavailable or in standby - String msg = "No namenode available to invoke " + method.getName() + " " + - Arrays.toString(params); - LOG.error(msg); - for (Entry<FederationNamenodeContext, IOException> entry : - ioes.entrySet()) { - FederationNamenodeContext namenode = entry.getKey(); - String nsId = namenode.getNameserviceId(); - String nnId = namenode.getNamenodeId(); - String addr = namenode.getRpcAddress(); - IOException ioe = entry.getValue(); - if (ioe instanceof StandbyException) { - LOG.error("{} {} at {} is in Standby", nsId, nnId, addr); - } else { - LOG.error("{} {} at {} error: \"{}\"", - nsId, nnId, addr, ioe.getMessage()); - } - } - throw new StandbyException(msg); - } - - /** - * Invokes a method on the designated object. Catches exceptions specific to - * the invocation. - * - * Re-throws exceptions generated by the remote RPC call as either - * RemoteException or IOException. - * - * @param nsId Identifier for the namespace - * @param retryCount Current retry times - * @param method Method to invoke - * @param obj Target object for the method - * @param params Variable parameters - * @return Response from the remote server - * @throws IOException - * @throws InterruptedException - */ - private Object invoke(String nsId, int retryCount, final Method method, - final Object obj, final Object... params) throws IOException { - try { - return method.invoke(obj, params); - } catch (IllegalAccessException e) { - LOG.error("Unexpected exception while proxying API", e); - return null; - } catch (IllegalArgumentException e) { - LOG.error("Unexpected exception while proxying API", e); - return null; - } catch (InvocationTargetException e) { - Throwable cause = e.getCause(); - if (cause instanceof IOException) { - IOException ioe = (IOException) cause; - - // Check if we should retry. - RetryDecision decision = shouldRetry(ioe, retryCount, nsId); - if (decision == RetryDecision.RETRY) { - if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpRetries(); - } - - // retry - return invoke(nsId, ++retryCount, method, obj, params); - } else if (decision == RetryDecision.FAILOVER_AND_RETRY) { - // failover, invoker looks for standby exceptions for failover. - if (ioe instanceof StandbyException) { - throw ioe; - } else { - throw new StandbyException(ioe.getMessage()); - } - } else { - if (ioe instanceof RemoteException) { - RemoteException re = (RemoteException) ioe; - ioe = re.unwrapRemoteException(); - ioe = getCleanException(ioe); - } - throw ioe; - } - } else { - throw new IOException(e); - } - } - } - - /** - * Check if the cluster of given nameservice id is available. - * @param nsId nameservice ID. - * @return - * @throws IOException - */ - private boolean isClusterUnAvailable(String nsId) throws IOException { - List<? extends FederationNamenodeContext> nnState = this.namenodeResolver - .getNamenodesForNameserviceId(nsId); - - if (nnState != null) { - for (FederationNamenodeContext nnContext : nnState) { - // Once we find one NN is in active state, we assume this - // cluster is available. - if (nnContext.getState() == FederationNamenodeServiceState.ACTIVE) { - return false; - } - } - } - - return true; - } - - /** - * Get a clean copy of the exception. Sometimes the exceptions returned by the - * server contain the full stack trace in the message. - * - * @param ioe Exception to clean up. - * @return Copy of the original exception with a clean message. - */ - private static IOException getCleanException(IOException ioe) { - IOException ret = null; - - String msg = ioe.getMessage(); - Throwable cause = ioe.getCause(); - StackTraceElement[] stackTrace = ioe.getStackTrace(); - - // Clean the message by removing the stack trace - int index = msg.indexOf("\n"); - if (index > 0) { - String[] msgSplit = msg.split("\n"); - msg = msgSplit[0]; - - // Parse stack trace from the message - List<StackTraceElement> elements = new LinkedList<>(); - for (int i=1; i<msgSplit.length; i++) { - String line = msgSplit[i]; - Matcher matcher = STACK_TRACE_PATTERN.matcher(line); - if (matcher.find()) { - String declaringClass = matcher.group(1); - String methodName = matcher.group(2); - String fileName = matcher.group(3); - int lineNumber = Integer.parseInt(matcher.group(4)); - StackTraceElement element = new StackTraceElement( - declaringClass, methodName, fileName, lineNumber); - elements.add(element); - } - } - stackTrace = elements.toArray(new StackTraceElement[elements.size()]); - } - - // Create the new output exception - if (ioe instanceof RemoteException) { - RemoteException re = (RemoteException)ioe; - ret = new RemoteException(re.getClassName(), msg); - } else { - // Try the simple constructor and initialize the fields - Class<? extends IOException> ioeClass = ioe.getClass(); - try { - Constructor<? extends IOException> constructor = - ioeClass.getDeclaredConstructor(String.class); - ret = constructor.newInstance(msg); - } catch (ReflectiveOperationException e) { - // If there are errors, just use the input one - LOG.error("Could not create exception {}", ioeClass.getSimpleName(), e); - ret = ioe; - } - } - if (ret != null) { - ret.initCause(cause); - ret.setStackTrace(stackTrace); - } - - return ret; - } - - /** - * Invokes a ClientProtocol method. Determines the target nameservice via a - * provided block. - * - * Re-throws exceptions generated by the remote RPC call as either - * RemoteException or IOException. - * - * @param block Block used to determine appropriate nameservice. - * @param method The remote method and parameters to invoke. - * @return The result of invoking the method. - * @throws IOException - */ - public Object invokeSingle(final ExtendedBlock block, RemoteMethod method) - throws IOException { - String bpId = block.getBlockPoolId(); - return invokeSingleBlockPool(bpId, method); - } - - /** - * Invokes a ClientProtocol method. Determines the target nameservice using - * the block pool id. - * - * Re-throws exceptions generated by the remote RPC call as either - * RemoteException or IOException. - * - * @param bpId Block pool identifier. - * @param method The remote method and parameters to invoke. - * @return The result of invoking the method. - * @throws IOException - */ - public Object invokeSingleBlockPool(final String bpId, RemoteMethod method) - throws IOException { - String nsId = getNameserviceForBlockPoolId(bpId); - return invokeSingle(nsId, method); - } - - /** - * Invokes a ClientProtocol method against the specified namespace. - * - * Re-throws exceptions generated by the remote RPC call as either - * RemoteException or IOException. - * - * @param nsId Target namespace for the method. - * @param method The remote method and parameters to invoke. - * @return The result of invoking the method. - * @throws IOException - */ - public Object invokeSingle(final String nsId, RemoteMethod method) - throws IOException { - UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); - List<? extends FederationNamenodeContext> nns = - getNamenodesForNameservice(nsId); - RemoteLocationContext loc = new RemoteLocation(nsId, "/"); - return invokeMethod(ugi, nns, method.getMethod(), method.getParams(loc)); - } - - /** - * Invokes a single proxy call for a single location. - * - * Re-throws exceptions generated by the remote RPC call as either - * RemoteException or IOException. - * - * @param location RemoteLocation to invoke. - * @param remoteMethod The remote method and parameters to invoke. - * @return The result of invoking the method if successful. - * @throws IOException - */ - public Object invokeSingle(final RemoteLocationContext location, - RemoteMethod remoteMethod) throws IOException { - List<RemoteLocationContext> locations = Collections.singletonList(location); - return invokeSequential(locations, remoteMethod); - } - - /** - * Invokes sequential proxy calls to different locations. Continues to invoke - * calls until a call returns without throwing a remote exception. - * - * @param locations List of locations/nameservices to call concurrently. - * @param remoteMethod The remote method and parameters to invoke. - * @return The result of the first successful call, or if no calls are - * successful, the result of the last RPC call executed. - * @throws IOException if the success condition is not met and one of the RPC - * calls generated a remote exception. - */ - public Object invokeSequential( - final List<? extends RemoteLocationContext> locations, - final RemoteMethod remoteMethod) throws IOException { - return invokeSequential(locations, remoteMethod, null, null); - } - - /** - * Invokes sequential proxy calls to different locations. Continues to invoke - * calls until the success condition is met, or until all locations have been - * attempted. - * - * The success condition may be specified by: - * <ul> - * <li>An expected result class - * <li>An expected result value - * </ul> - * - * If no expected result class/values are specified, the success condition is - * a call that does not throw a remote exception. - * - * @param locations List of locations/nameservices to call concurrently. - * @param remoteMethod The remote method and parameters to invoke. - * @param expectedResultClass In order to be considered a positive result, the - * return type must be of this class. - * @param expectedResultValue In order to be considered a positive result, the - * return value must equal the value of this object. - * @return The result of the first successful call, or if no calls are - * successful, the result of the first RPC call executed. - * @throws IOException if the success condition is not met, return the first - * remote exception generated. - */ - public <T> T invokeSequential( - final List<? extends RemoteLocationContext> locations, - final RemoteMethod remoteMethod, Class<T> expectedResultClass, - Object expectedResultValue) throws IOException { - - final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); - final Method m = remoteMethod.getMethod(); - IOException firstThrownException = null; - IOException lastThrownException = null; - Object firstResult = null; - // Invoke in priority order - for (final RemoteLocationContext loc : locations) { - String ns = loc.getNameserviceId(); - List<? extends FederationNamenodeContext> namenodes = - getNamenodesForNameservice(ns); - try { - Object[] params = remoteMethod.getParams(loc); - Object result = invokeMethod(ugi, namenodes, m, params); - // Check if the result is what we expected - if (isExpectedClass(expectedResultClass, result) && - isExpectedValue(expectedResultValue, result)) { - // Valid result, stop here - @SuppressWarnings("unchecked") - T ret = (T)result; - return ret; - } - if (firstResult == null) { - firstResult = result; - } - } catch (IOException ioe) { - // Record it and move on - lastThrownException = (IOException) ioe; - if (firstThrownException == null) { - firstThrownException = lastThrownException; - } - } catch (Exception e) { - // Unusual error, ClientProtocol calls always use IOException (or - // RemoteException). Re-wrap in IOException for compatibility with - // ClientProtcol. - LOG.error("Unexpected exception {} proxying {} to {}", - e.getClass(), m.getName(), ns, e); - lastThrownException = new IOException( - "Unexpected exception proxying API " + e.getMessage(), e); - if (firstThrownException == null) { - firstThrownException = lastThrownException; - } - } - } - - if (firstThrownException != null) { - // re-throw the last exception thrown for compatibility - throw firstThrownException; - } - // Return the last result, whether it is the value we are looking for or a - @SuppressWarnings("unchecked") - T ret = (T)firstResult; - return ret; - } - - /** - * Checks if a result matches the required result class. - * - * @param expectedClass Required result class, null to skip the check. - * @param clazz The result to check. - * @return True if the result is an instance of the required class or if the - * expected class is null. - */ - private static boolean isExpectedClass(Class<?> expectedClass, Object clazz) { - if (expectedClass == null) { - return true; - } else if (clazz == null) { - return false; - } else { - return expectedClass.isInstance(clazz); - } - } - - /** - * Checks if a result matches the expected value. - * - * @param expectedValue The expected value, null to skip the check. - * @param value The result to check. - * @return True if the result is equals to the expected value or if the - * expected value is null. - */ - private static boolean isExpectedValue(Object expectedValue, Object value) { - if (expectedValue == null) { - return true; - } else if (value == null) { - return false; - } else { - return value.equals(expectedValue); - } - } - - /** - * Invoke multiple concurrent proxy calls to different clients. Returns an - * array of results. - * - * Re-throws exceptions generated by the remote RPC call as either - * RemoteException or IOException. - * - * @param <T> The type of the remote location. - * @param locations List of remote locations to call concurrently. - * @param method The remote method and parameters to invoke. - * @param requireResponse If true an exception will be thrown if all calls do - * not complete. If false exceptions are ignored and all data results - * successfully received are returned. - * @param standby If the requests should go to the standby namenodes too. - * @throws IOException If all the calls throw an exception. - */ - public <T extends RemoteLocationContext, R> void invokeConcurrent( - final Collection<T> locations, final RemoteMethod method, - boolean requireResponse, boolean standby) throws IOException { - invokeConcurrent(locations, method, requireResponse, standby, void.class); - } - - /** - * Invokes multiple concurrent proxy calls to different clients. Returns an - * array of results. - * - * Re-throws exceptions generated by the remote RPC call as either - * RemoteException or IOException. - * - * @param <T> The type of the remote location. - * @param <R> The type of the remote method return. - * @param locations List of remote locations to call concurrently. - * @param method The remote method and parameters to invoke. - * @param requireResponse If true an exception will be thrown if all calls do - * not complete. If false exceptions are ignored and all data results - * successfully received are returned. - * @param standby If the requests should go to the standby namenodes too. - * @param clazz Type of the remote return type. - * @return Result of invoking the method per subcluster: nsId -> result. - * @throws IOException If requiredResponse=true and any of the calls throw an - * exception. - */ - public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent( - final Collection<T> locations, final RemoteMethod method, - boolean requireResponse, boolean standby, Class<R> clazz) - throws IOException { - return invokeConcurrent( - locations, method, requireResponse, standby, -1, clazz); - } - - /** - * Invokes multiple concurrent proxy calls to different clients. Returns an - * array of results. - * - * Re-throws exceptions generated by the remote RPC call as either - * RemoteException or IOException. - * - * @param <T> The type of the remote location. - * @param <R> The type of the remote method return. - * @param locations List of remote locations to call concurrently. - * @param method The remote method and parameters to invoke. - * @param requireResponse If true an exception will be thrown if all calls do - * not complete. If false exceptions are ignored and all data results - * successfully received are returned. - * @param standby If the requests should go to the standby namenodes too. - * @param timeOutMs Timeout for each individual call. - * @param clazz Type of the remote return type. - * @return Result of invoking the method per subcluster: nsId -> result. - * @throws IOException If requiredResponse=true and any of the calls throw an - * exception. - */ - @SuppressWarnings("unchecked") - public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent( - final Collection<T> locations, final RemoteMethod method, - boolean requireResponse, boolean standby, long timeOutMs, Class<R> clazz) - throws IOException { - - final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); - final Method m = method.getMethod(); - - if (locations.size() == 1) { - // Shortcut, just one call - T location = locations.iterator().next(); - String ns = location.getNameserviceId(); - final List<? extends FederationNamenodeContext> namenodes = - getNamenodesForNameservice(ns); - Object[] paramList = method.getParams(location); - Object result = invokeMethod(ugi, namenodes, m, paramList); - return Collections.singletonMap(location, clazz.cast(result)); - } - - List<T> orderedLocations = new LinkedList<>(); - Set<Callable<Object>> callables = new HashSet<>(); - for (final T location : locations) { - String nsId = location.getNameserviceId(); - final List<? extends FederationNamenodeContext> namenodes = - getNamenodesForNameservice(nsId); - final Object[] paramList = method.getParams(location); - if (standby) { - // Call the objectGetter to all NNs (including standby) - for (final FederationNamenodeContext nn : namenodes) { - String nnId = nn.getNamenodeId(); - final List<FederationNamenodeContext> nnList = - Collections.singletonList(nn); - T nnLocation = location; - if (location instanceof RemoteLocation) { - nnLocation = (T)new RemoteLocation(nsId, nnId, location.getDest()); - } - orderedLocations.add(nnLocation); - callables.add(new Callable<Object>() { - public Object call() throws Exception { - return invokeMethod(ugi, nnList, m, paramList); - } - }); - } - } else { - // Call the objectGetter in order of nameservices in the NS list - orderedLocations.add(location); - callables.add(new Callable<Object>() { - public Object call() throws Exception { - return invokeMethod(ugi, namenodes, m, paramList); - } - }); - } - } - - if (rpcMonitor != null) { - rpcMonitor.proxyOp(); - } - - try { - List<Future<Object>> futures = null; - if (timeOutMs > 0) { - futures = executorService.invokeAll( - callables, timeOutMs, TimeUnit.MILLISECONDS); - } else { - futures = executorService.invokeAll(callables); - } - Map<T, R> results = new TreeMap<>(); - Map<T, IOException> exceptions = new TreeMap<>(); - for (int i=0; i<futures.size(); i++) { - T location = orderedLocations.get(i); - try { - Future<Object> future = futures.get(i); - Object result = future.get(); - results.put(location, clazz.cast(result)); - } catch (CancellationException ce) { - T loc = orderedLocations.get(i); - String msg = - "Invocation to \"" + loc + "\" for \"" + method + "\" timed out"; - LOG.error(msg); - IOException ioe = new IOException(msg); - exceptions.put(location, ioe); - } catch (ExecutionException ex) { - Throwable cause = ex.getCause(); - LOG.debug("Canot execute {} in {}: {}", - m.getName(), location, cause.getMessage()); - - // Convert into IOException if needed - IOException ioe = null; - if (cause instanceof IOException) { - ioe = (IOException) cause; - } else { - ioe = new IOException("Unhandled exception while proxying API " + - m.getName() + ": " + cause.getMessage(), cause); - } - - // Response from all servers required, use this error. - if (requireResponse) { - throw ioe; - } - - // Store the exceptions - exceptions.put(location, ioe); - } - } - - // Throw the exception for the first location if there are no results - if (results.isEmpty()) { - T location = orderedLocations.get(0); - IOException ioe = exceptions.get(location); - if (ioe != null) { - throw ioe; - } - } - - return results; - } catch (InterruptedException ex) { - LOG.error("Unexpected error while invoking API: {}", ex.getMessage()); - throw new IOException( - "Unexpected error while invoking API " + ex.getMessage(), ex); - } - } - - /** - * Get a prioritized list of NNs that share the same nameservice ID (in the - * same namespace). NNs that are reported as ACTIVE will be first in the list. - * - * @param nsId The nameservice ID for the namespace. - * @return A prioritized list of NNs to use for communication. - * @throws IOException If a NN cannot be located for the nameservice ID. - */ - private List<? extends FederationNamenodeContext> getNamenodesForNameservice( - final String nsId) throws IOException { - - final List<? extends FederationNamenodeContext> namenodes = - namenodeResolver.getNamenodesForNameserviceId(nsId); - - if (namenodes == null || namenodes.isEmpty()) { - throw new IOException("Cannot locate a registered namenode for " + nsId + - " from " + this.routerId); - } - return namenodes; - } - - /** - * Get a prioritized list of NNs that share the same block pool ID (in the - * same namespace). NNs that are reported as ACTIVE will be first in the list. - * - * @param bpId The blockpool ID for the namespace. - * @return A prioritized list of NNs to use for communication. - * @throws IOException If a NN cannot be located for the block pool ID. - */ - private List<? extends FederationNamenodeContext> getNamenodesForBlockPoolId( - final String bpId) throws IOException { - - List<? extends FederationNamenodeContext> namenodes = - namenodeResolver.getNamenodesForBlockPoolId(bpId); - - if (namenodes == null || namenodes.isEmpty()) { - throw new IOException("Cannot locate a registered namenode for " + bpId + - " from " + this.routerId); - } - return namenodes; - } - - /** - * Get the nameservice identifier for a block pool. - * - * @param bpId Identifier of the block pool. - * @return Nameservice identifier. - * @throws IOException If a NN cannot be located for the block pool ID. - */ - private String getNameserviceForBlockPoolId(final String bpId) - throws IOException { - List<? extends FederationNamenodeContext> namenodes = - getNamenodesForBlockPoolId(bpId); - FederationNamenodeContext namenode = namenodes.get(0); - return namenode.getNameserviceId(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java deleted file mode 100644 index df9aa11..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics; -import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; - -/** - * Metrics and monitoring interface for the router RPC server. Allows pluggable - * diagnostics and monitoring services to be attached. - */ -public interface RouterRpcMonitor { - - /** - * Initialize the monitor. - * @param conf Configuration for the monitor. - * @param server RPC server. - * @param store State Store. - */ - void init( - Configuration conf, RouterRpcServer server, StateStoreService store); - - /** - * Get Router RPC metrics info. - * @return The instance of FederationRPCMetrics. - */ - FederationRPCMetrics getRPCMetrics(); - - /** - * Close the monitor. - */ - void close(); - - /** - * Start processing an operation on the Router. - */ - void startOp(); - - /** - * Start proxying an operation to the Namenode. - * @return Id of the thread doing the proxying. - */ - long proxyOp(); - - /** - * Mark a proxy operation as completed. - * @param success If the operation was successful. - */ - void proxyOpComplete(boolean success); - - /** - * Failed to proxy an operation to a Namenode because it was in standby. - */ - void proxyOpFailureStandby(); - - /** - * Failed to proxy an operation to a Namenode because of an unexpected - * exception. - */ - void proxyOpFailureCommunicate(); - - /** - * Failed to proxy an operation because it is not implemented. - */ - void proxyOpNotImplemented(); - - /** - * Retry to proxy an operation to a Namenode because of an unexpected - * exception. - */ - void proxyOpRetries(); - - /** - * If the Router cannot contact the State Store in an operation. - */ - void routerFailureStateStore(); - - /** - * If the Router is in safe mode. - */ - void routerFailureSafemode(); - - /** - * If a path is locked. - */ - void routerFailureLocked(); - - /** - * If a path is in a read only mount point. - */ - void routerFailureReadOnly(); -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
