http://git-wip-us.apache.org/repos/asf/hadoop/blob/4aa34324/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java deleted file mode 100644 index e94f69b..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java +++ /dev/null @@ -1,436 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import java.io.IOException; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.TreeMap; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.Time; -import org.mortbay.util.ajax.JSON; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Implements a pool of connections for the {@link Router} to be able to open - * many connections to many Namenodes. - */ -public class ConnectionManager { - - private static final Logger LOG = - LoggerFactory.getLogger(ConnectionManager.class); - - /** Number of parallel new connections to create. */ - protected static final int MAX_NEW_CONNECTIONS = 100; - - /** Minimum amount of active connections: 50%. */ - protected static final float MIN_ACTIVE_RATIO = 0.5f; - - - /** Configuration for the connection manager, pool and sockets. */ - private final Configuration conf; - - /** Min number of connections per user + nn. */ - private final int minSize = 1; - /** Max number of connections per user + nn. */ - private final int maxSize; - - /** How often we close a pool for a particular user + nn. */ - private final long poolCleanupPeriodMs; - /** How often we close a connection in a pool. */ - private final long connectionCleanupPeriodMs; - - /** Map of connection pools, one pool per user + NN. */ - private final Map<ConnectionPoolId, ConnectionPool> pools; - /** Lock for accessing pools. */ - private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - private final Lock readLock = readWriteLock.readLock(); - private final Lock writeLock = readWriteLock.writeLock(); - - /** Queue for creating new connections. */ - private final BlockingQueue<ConnectionPool> creatorQueue = - new ArrayBlockingQueue<>(MAX_NEW_CONNECTIONS); - /** Create new connections asynchronously. */ - private final ConnectionCreator creator; - /** Periodic executor to remove stale connection pools. */ - private final ScheduledThreadPoolExecutor cleaner = - new ScheduledThreadPoolExecutor(1); - - /** If the connection manager is running. */ - private boolean running = false; - - - /** - * Creates a proxy client connection pool manager. - * - * @param config Configuration for the connections. - */ - public ConnectionManager(Configuration config) { - this.conf = config; - - // Configure minimum and maximum connection pools - this.maxSize = this.conf.getInt( - DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE, - DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE_DEFAULT); - - // Map with the connections indexed by UGI and Namenode - this.pools = new HashMap<>(); - - // Create connections in a thread asynchronously - this.creator = new ConnectionCreator(creatorQueue); - this.creator.setDaemon(true); - - // Cleanup periods - this.poolCleanupPeriodMs = this.conf.getLong( - DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN, - DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN_DEFAULT); - LOG.info("Cleaning connection pools every {} seconds", - TimeUnit.MILLISECONDS.toSeconds(this.poolCleanupPeriodMs)); - this.connectionCleanupPeriodMs = this.conf.getLong( - DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS, - DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT); - LOG.info("Cleaning connections every {} seconds", - TimeUnit.MILLISECONDS.toSeconds(this.connectionCleanupPeriodMs)); - } - - /** - * Start the connection manager. - */ - public void start() { - // Start the thread that creates connections asynchronously - this.creator.start(); - - // Schedule a task to remove stale connection pools and sockets - long recyleTimeMs = Math.min( - poolCleanupPeriodMs, connectionCleanupPeriodMs); - LOG.info("Cleaning every {} seconds", - TimeUnit.MILLISECONDS.toSeconds(recyleTimeMs)); - this.cleaner.scheduleAtFixedRate( - new CleanupTask(), 0, recyleTimeMs, TimeUnit.MILLISECONDS); - - // Mark the manager as running - this.running = true; - } - - /** - * Stop the connection manager by closing all the pools. - */ - public void close() { - this.creator.shutdown(); - this.cleaner.shutdown(); - this.running = false; - - writeLock.lock(); - try { - for (ConnectionPool pool : this.pools.values()) { - pool.close(); - } - this.pools.clear(); - } finally { - writeLock.unlock(); - } - } - - /** - * Fetches the next available proxy client in the pool. Each client connection - * is reserved for a single user and cannot be reused until free. - * - * @param ugi User group information. - * @param nnAddress Namenode address for the connection. - * @return Proxy client to connect to nnId as UGI. - * @throws IOException If the connection cannot be obtained. - */ - public ConnectionContext getConnection( - UserGroupInformation ugi, String nnAddress) throws IOException { - - // Check if the manager is shutdown - if (!this.running) { - LOG.error( - "Cannot get a connection to {} because the manager isn't running", - nnAddress); - return null; - } - - // Try to get the pool if created - ConnectionPoolId connectionId = new ConnectionPoolId(ugi, nnAddress); - ConnectionPool pool = null; - readLock.lock(); - try { - pool = this.pools.get(connectionId); - } finally { - readLock.unlock(); - } - - // Create the pool if not created before - if (pool == null) { - writeLock.lock(); - try { - pool = this.pools.get(connectionId); - if (pool == null) { - pool = new ConnectionPool( - this.conf, nnAddress, ugi, this.minSize, this.maxSize); - this.pools.put(connectionId, pool); - } - } finally { - writeLock.unlock(); - } - } - - ConnectionContext conn = pool.getConnection(); - - // Add a new connection to the pool if it wasn't usable - if (conn == null || !conn.isUsable()) { - if (!this.creatorQueue.offer(pool)) { - LOG.error("Cannot add more than {} connections at the same time", - MAX_NEW_CONNECTIONS); - } - } - - if (conn != null && conn.isClosed()) { - LOG.error("We got a closed connection from {}", pool); - conn = null; - } - - return conn; - } - - /** - * Get the number of connection pools. - * - * @return Number of connection pools. - */ - public int getNumConnectionPools() { - readLock.lock(); - try { - return pools.size(); - } finally { - readLock.unlock(); - } - } - - /** - * Get number of open connections. - * - * @return Number of open connections. - */ - public int getNumConnections() { - int total = 0; - readLock.lock(); - try { - for (ConnectionPool pool : this.pools.values()) { - total += pool.getNumConnections(); - } - } finally { - readLock.unlock(); - } - return total; - } - - /** - * Get number of active connections. - * - * @return Number of active connections. - */ - public int getNumActiveConnections() { - int total = 0; - readLock.lock(); - try { - for (ConnectionPool pool : this.pools.values()) { - total += pool.getNumActiveConnections(); - } - } finally { - readLock.unlock(); - } - return total; - } - - /** - * Get the number of connections to be created. - * - * @return Number of connections to be created. - */ - public int getNumCreatingConnections() { - return this.creatorQueue.size(); - } - - /** - * Get a JSON representation of the connection pool. - * - * @return JSON representation of all the connection pools. - */ - public String getJSON() { - final Map<String, String> info = new TreeMap<>(); - readLock.lock(); - try { - for (Entry<ConnectionPoolId, ConnectionPool> entry : - this.pools.entrySet()) { - ConnectionPoolId connectionPoolId = entry.getKey(); - ConnectionPool pool = entry.getValue(); - info.put(connectionPoolId.toString(), pool.getJSON()); - } - } finally { - readLock.unlock(); - } - return JSON.toString(info); - } - - @VisibleForTesting - Map<ConnectionPoolId, ConnectionPool> getPools() { - return this.pools; - } - - /** - * Clean the unused connections for this pool. - * - * @param pool Connection pool to cleanup. - */ - @VisibleForTesting - void cleanup(ConnectionPool pool) { - if (pool.getNumConnections() > pool.getMinSize()) { - // Check if the pool hasn't been active in a while or not 50% are used - long timeSinceLastActive = Time.now() - pool.getLastActiveTime(); - int total = pool.getNumConnections(); - int active = pool.getNumActiveConnections(); - if (timeSinceLastActive > connectionCleanupPeriodMs || - active < MIN_ACTIVE_RATIO * total) { - // Remove and close 1 connection - List<ConnectionContext> conns = pool.removeConnections(1); - for (ConnectionContext conn : conns) { - conn.close(); - } - LOG.debug("Removed connection {} used {} seconds ago. " + - "Pool has {}/{} connections", pool.getConnectionPoolId(), - TimeUnit.MILLISECONDS.toSeconds(timeSinceLastActive), - pool.getNumConnections(), pool.getMaxSize()); - } - } - } - - /** - * Removes stale connections not accessed recently from the pool. This is - * invoked periodically. - */ - private class CleanupTask implements Runnable { - - @Override - public void run() { - long currentTime = Time.now(); - List<ConnectionPoolId> toRemove = new LinkedList<>(); - - // Look for stale pools - readLock.lock(); - try { - for (Entry<ConnectionPoolId, ConnectionPool> entry : pools.entrySet()) { - ConnectionPool pool = entry.getValue(); - long lastTimeActive = pool.getLastActiveTime(); - boolean isStale = - currentTime > (lastTimeActive + poolCleanupPeriodMs); - if (lastTimeActive > 0 && isStale) { - // Remove this pool - LOG.debug("Closing and removing stale pool {}", pool); - pool.close(); - ConnectionPoolId poolId = entry.getKey(); - toRemove.add(poolId); - } else { - // Keep this pool but clean connections inside - LOG.debug("Cleaning up {}", pool); - cleanup(pool); - } - } - } finally { - readLock.unlock(); - } - - // Remove stale pools - if (!toRemove.isEmpty()) { - writeLock.lock(); - try { - for (ConnectionPoolId poolId : toRemove) { - pools.remove(poolId); - } - } finally { - writeLock.unlock(); - } - } - } - } - - /** - * Thread that creates connections asynchronously. - */ - private static class ConnectionCreator extends Thread { - /** If the creator is running. */ - private boolean running = true; - /** Queue to push work to. */ - private BlockingQueue<ConnectionPool> queue; - - ConnectionCreator(BlockingQueue<ConnectionPool> blockingQueue) { - super("Connection creator"); - this.queue = blockingQueue; - } - - @Override - public void run() { - while (this.running) { - try { - ConnectionPool pool = this.queue.take(); - try { - int total = pool.getNumConnections(); - int active = pool.getNumActiveConnections(); - if (pool.getNumConnections() < pool.getMaxSize() && - active >= MIN_ACTIVE_RATIO * total) { - ConnectionContext conn = pool.newConnection(); - pool.addConnection(conn); - } else { - LOG.debug("Cannot add more than {} connections to {}", - pool.getMaxSize(), pool); - } - } catch (IOException e) { - LOG.error("Cannot create a new connection", e); - } - } catch (InterruptedException e) { - LOG.error("The connection creator was interrupted"); - this.running = false; - } - } - } - - /** - * Stop this connection creator. - */ - public void shutdown() { - this.running = false; - this.interrupt(); - } - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4aa34324/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java deleted file mode 100644 index 06bed9c..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java +++ /dev/null @@ -1,337 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.net.SocketFactory; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB; -import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.io.retry.RetryUtils; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.SaslRpcServer; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.Time; -import org.mortbay.util.ajax.JSON; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Maintains a pool of connections for each User (including tokens) + NN. The - * RPC client maintains a single socket, to achieve throughput similar to a NN, - * each request is multiplexed across multiple sockets/connections from a - * pool. - */ [email protected] [email protected] -public class ConnectionPool { - - private static final Logger LOG = - LoggerFactory.getLogger(ConnectionPool.class); - - - /** Configuration settings for the connection pool. */ - private final Configuration conf; - - /** Identifier for this connection pool. */ - private final ConnectionPoolId connectionPoolId; - /** Namenode this pool connects to. */ - private final String namenodeAddress; - /** User for this connections. */ - private final UserGroupInformation ugi; - - /** Pool of connections. We mimic a COW array. */ - private volatile List<ConnectionContext> connections = new ArrayList<>(); - /** Connection index for round-robin. */ - private final AtomicInteger clientIndex = new AtomicInteger(0); - - /** Min number of connections per user. */ - private final int minSize; - /** Max number of connections per user. */ - private final int maxSize; - - /** The last time a connection was active. */ - private volatile long lastActiveTime = 0; - - - protected ConnectionPool(Configuration config, String address, - UserGroupInformation user, int minPoolSize, int maxPoolSize) - throws IOException { - - this.conf = config; - - // Connection pool target - this.ugi = user; - this.namenodeAddress = address; - this.connectionPoolId = - new ConnectionPoolId(this.ugi, this.namenodeAddress); - - // Set configuration parameters for the pool - this.minSize = minPoolSize; - this.maxSize = maxPoolSize; - - // Add minimum connections to the pool - for (int i=0; i<this.minSize; i++) { - ConnectionContext newConnection = newConnection(); - this.connections.add(newConnection); - } - LOG.debug("Created connection pool \"{}\" with {} connections", - this.connectionPoolId, this.minSize); - } - - /** - * Get the maximum number of connections allowed in this pool. - * - * @return Maximum number of connections. - */ - protected int getMaxSize() { - return this.maxSize; - } - - /** - * Get the minimum number of connections in this pool. - * - * @return Minimum number of connections. - */ - protected int getMinSize() { - return this.minSize; - } - - /** - * Get the connection pool identifier. - * - * @return Connection pool identifier. - */ - protected ConnectionPoolId getConnectionPoolId() { - return this.connectionPoolId; - } - - /** - * Return the next connection round-robin. - * - * @return Connection context. - */ - protected ConnectionContext getConnection() { - - this.lastActiveTime = Time.now(); - - // Get a connection from the pool following round-robin - ConnectionContext conn = null; - List<ConnectionContext> tmpConnections = this.connections; - int size = tmpConnections.size(); - int threadIndex = this.clientIndex.getAndIncrement(); - for (int i=0; i<size; i++) { - int index = (threadIndex + i) % size; - conn = tmpConnections.get(index); - if (conn != null && conn.isUsable()) { - return conn; - } - } - - // We return a connection even if it's active - return conn; - } - - /** - * Add a connection to the current pool. It uses a Copy-On-Write approach. - * - * @param conn New connection to add to the pool. - */ - public synchronized void addConnection(ConnectionContext conn) { - List<ConnectionContext> tmpConnections = new ArrayList<>(this.connections); - tmpConnections.add(conn); - this.connections = tmpConnections; - - this.lastActiveTime = Time.now(); - } - - /** - * Remove connections from the current pool. - * - * @param num Number of connections to remove. - * @return Removed connections. - */ - public synchronized List<ConnectionContext> removeConnections(int num) { - List<ConnectionContext> removed = new LinkedList<>(); - - // Remove and close the last connection - List<ConnectionContext> tmpConnections = new ArrayList<>(); - for (int i=0; i<this.connections.size(); i++) { - ConnectionContext conn = this.connections.get(i); - if (i < this.minSize || i < this.connections.size() - num) { - tmpConnections.add(conn); - } else { - removed.add(conn); - } - } - this.connections = tmpConnections; - - return removed; - } - - /** - * Close the connection pool. - */ - protected synchronized void close() { - long timeSinceLastActive = TimeUnit.MILLISECONDS.toSeconds( - Time.now() - getLastActiveTime()); - LOG.debug("Shutting down connection pool \"{}\" used {} seconds ago", - this.connectionPoolId, timeSinceLastActive); - - for (ConnectionContext connection : this.connections) { - connection.close(); - } - this.connections.clear(); - } - - /** - * Number of connections in the pool. - * - * @return Number of connections. - */ - protected int getNumConnections() { - return this.connections.size(); - } - - /** - * Number of active connections in the pool. - * - * @return Number of active connections. - */ - protected int getNumActiveConnections() { - int ret = 0; - - List<ConnectionContext> tmpConnections = this.connections; - for (ConnectionContext conn : tmpConnections) { - if (conn.isActive()) { - ret++; - } - } - return ret; - } - - /** - * Get the last time the connection pool was used. - * - * @return Last time the connection pool was used. - */ - protected long getLastActiveTime() { - return this.lastActiveTime; - } - - @Override - public String toString() { - return this.connectionPoolId.toString(); - } - - /** - * JSON representation of the connection pool. - * - * @return String representation of the JSON. - */ - public String getJSON() { - final Map<String, String> info = new LinkedHashMap<>(); - info.put("active", Integer.toString(getNumActiveConnections())); - info.put("total", Integer.toString(getNumConnections())); - if (LOG.isDebugEnabled()) { - List<ConnectionContext> tmpConnections = this.connections; - for (int i=0; i<tmpConnections.size(); i++) { - ConnectionContext connection = tmpConnections.get(i); - info.put(i + " active", Boolean.toString(connection.isActive())); - info.put(i + " closed", Boolean.toString(connection.isClosed())); - } - } - return JSON.toString(info); - } - - /** - * Create a new proxy wrapper for a client NN connection. - * @return Proxy for the target ClientProtocol that contains the user's - * security context. - * @throws IOException - */ - public ConnectionContext newConnection() throws IOException { - return newConnection(this.conf, this.namenodeAddress, this.ugi); - } - - /** - * Creates a proxy wrapper for a client NN connection. Each proxy contains - * context for a single user/security context. To maximize throughput it is - * recommended to use multiple connection per user+server, allowing multiple - * writes and reads to be dispatched in parallel. - * - * @param conf Configuration for the connection. - * @param nnAddress Address of server supporting the ClientProtocol. - * @param ugi User context. - * @return Proxy for the target ClientProtocol that contains the user's - * security context. - * @throws IOException If it cannot be created. - */ - protected static ConnectionContext newConnection(Configuration conf, - String nnAddress, UserGroupInformation ugi) - throws IOException { - RPC.setProtocolEngine( - conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class); - - final RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy( - conf, - HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY, - HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT, - HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY, - HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT, - HdfsConstants.SAFEMODE_EXCEPTION_CLASS_NAME); - - SocketFactory factory = SocketFactory.getDefault(); - if (UserGroupInformation.isSecurityEnabled()) { - SaslRpcServer.init(conf); - } - InetSocketAddress socket = NetUtils.createSocketAddr(nnAddress); - final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class); - ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy( - ClientNamenodeProtocolPB.class, version, socket, ugi, conf, - factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy(); - ClientProtocol client = new ClientNamenodeProtocolTranslatorPB(proxy); - Text dtService = SecurityUtil.buildTokenService(socket); - - ProxyAndInfo<ClientProtocol> clientProxy = - new ProxyAndInfo<ClientProtocol>(client, dtService, socket); - ConnectionContext connection = new ConnectionContext(clientProxy); - return connection; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/4aa34324/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java deleted file mode 100644 index 6e1ee9a..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java +++ /dev/null @@ -1,123 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.lang.builder.HashCodeBuilder; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; - -/** - * Identifier for a connection for a user to a namenode. - */ [email protected] [email protected] -public class ConnectionPoolId implements Comparable<ConnectionPoolId> { - - /** Namenode identifier. */ - private final String nnId; - /** Information about the user. */ - private final UserGroupInformation ugi; - - /** - * New connection pool identifier. - * - * @param ugi Information of the user issuing the request. - * @param nnId Namenode address with port. - */ - public ConnectionPoolId(final UserGroupInformation ugi, final String nnId) { - this.nnId = nnId; - this.ugi = ugi; - } - - @Override - public int hashCode() { - int hash = new HashCodeBuilder(17, 31) - .append(this.nnId) - .append(this.ugi.toString()) - .append(this.getTokenIds()) - .toHashCode(); - return hash; - } - - @Override - public boolean equals(Object o) { - if (o instanceof ConnectionPoolId) { - ConnectionPoolId other = (ConnectionPoolId) o; - if (!this.nnId.equals(other.nnId)) { - return false; - } - if (!this.ugi.toString().equals(other.ugi.toString())) { - return false; - } - String thisTokens = this.getTokenIds().toString(); - String otherTokens = other.getTokenIds().toString(); - return thisTokens.equals(otherTokens); - } - return false; - } - - @Override - public String toString() { - return this.ugi + " " + this.getTokenIds() + "->" + this.nnId; - } - - @Override - public int compareTo(ConnectionPoolId other) { - int ret = this.nnId.compareTo(other.nnId); - if (ret == 0) { - ret = this.ugi.toString().compareTo(other.ugi.toString()); - } - if (ret == 0) { - String thisTokens = this.getTokenIds().toString(); - String otherTokens = other.getTokenIds().toString(); - ret = thisTokens.compareTo(otherTokens); - } - return ret; - } - - @VisibleForTesting - UserGroupInformation getUgi() { - return this.ugi; - } - - /** - * Get the token identifiers for this connection. - * @return List with the token identifiers. - */ - private List<String> getTokenIds() { - List<String> tokenIds = new ArrayList<>(); - Collection<Token<? extends TokenIdentifier>> tokens = this.ugi.getTokens(); - for (Token<? extends TokenIdentifier> token : tokens) { - byte[] tokenIdBytes = token.getIdentifier(); - String tokenId = Arrays.toString(tokenIdBytes); - tokenIds.add(tokenId); - } - Collections.sort(tokenIds); - return tokenIds; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/4aa34324/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/DFSRouter.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/DFSRouter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/DFSRouter.java deleted file mode 100644 index a2ac258..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/DFSRouter.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import static org.apache.hadoop.util.ExitUtil.terminate; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.service.CompositeService.CompositeServiceShutdownHook; -import org.apache.hadoop.util.ShutdownHookManager; -import org.apache.hadoop.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Tool to start the {@link Router} for Router-based federation. - */ -public final class DFSRouter { - - private static final Logger LOG = LoggerFactory.getLogger(DFSRouter.class); - - - /** Usage string for help message. */ - private static final String USAGE = "Usage: hdfs dfsrouter"; - - /** Priority of the Router shutdown hook. */ - public static final int SHUTDOWN_HOOK_PRIORITY = 30; - - - private DFSRouter() { - // This is just a class to trigger the Router - } - - /** - * Main run loop for the router. - * - * @param argv parameters. - */ - public static void main(String[] argv) { - if (DFSUtil.parseHelpArgument(argv, USAGE, System.out, true)) { - System.exit(0); - } - - try { - StringUtils.startupShutdownMessage(Router.class, argv, LOG); - - Router router = new Router(); - - ShutdownHookManager.get().addShutdownHook( - new CompositeServiceShutdownHook(router), SHUTDOWN_HOOK_PRIORITY); - - Configuration conf = new HdfsConfiguration(); - router.init(conf); - router.start(); - } catch (Throwable e) { - LOG.error("Failed to start router", e); - terminate(1, e); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/4aa34324/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java deleted file mode 100644 index 3dfd998..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java +++ /dev/null @@ -1,209 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.lang.reflect.Constructor; -import java.net.URL; -import java.net.URLConnection; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; -import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; -import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; -import org.apache.hadoop.util.VersionInfo; -import org.codehaus.jettison.json.JSONArray; -import org.codehaus.jettison.json.JSONException; -import org.codehaus.jettison.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Utilities for managing HDFS federation. - */ -public final class FederationUtil { - - private static final Logger LOG = - LoggerFactory.getLogger(FederationUtil.class); - - private FederationUtil() { - // Utility Class - } - - /** - * Get a JMX data from a web endpoint. - * - * @param beanQuery JMX bean. - * @param webAddress Web address of the JMX endpoint. - * @return JSON with the JMX data - */ - public static JSONArray getJmx(String beanQuery, String webAddress) { - JSONArray ret = null; - BufferedReader reader = null; - try { - String host = webAddress; - int port = -1; - if (webAddress.indexOf(":") > 0) { - String[] webAddressSplit = webAddress.split(":"); - host = webAddressSplit[0]; - port = Integer.parseInt(webAddressSplit[1]); - } - URL jmxURL = new URL("http", host, port, "/jmx?qry=" + beanQuery); - URLConnection conn = jmxURL.openConnection(); - conn.setConnectTimeout(5 * 1000); - conn.setReadTimeout(5 * 1000); - InputStream in = conn.getInputStream(); - InputStreamReader isr = new InputStreamReader(in, "UTF-8"); - reader = new BufferedReader(isr); - - StringBuilder sb = new StringBuilder(); - String line = null; - while ((line = reader.readLine()) != null) { - sb.append(line); - } - String jmxOutput = sb.toString(); - - // Parse JSON - JSONObject json = new JSONObject(jmxOutput); - ret = json.getJSONArray("beans"); - } catch (IOException e) { - LOG.error("Cannot read JMX bean {} from server {}: {}", - beanQuery, webAddress, e.getMessage()); - } catch (JSONException e) { - LOG.error("Cannot parse JMX output for {} from server {}: {}", - beanQuery, webAddress, e.getMessage()); - } catch (Exception e) { - LOG.error("Cannot parse JMX output for {} from server {}: {}", - beanQuery, webAddress, e); - } finally { - if (reader != null) { - try { - reader.close(); - } catch (IOException e) { - LOG.error("Problem closing {}", webAddress, e); - } - } - } - return ret; - } - - /** - * Fetch the Hadoop version string for this jar. - * - * @return Hadoop version string, e.g., 3.0.1. - */ - public static String getVersion() { - return VersionInfo.getVersion(); - } - - /** - * Fetch the build/compile information for this jar. - * - * @return String Compilation info. - */ - public static String getCompileInfo() { - return VersionInfo.getDate() + " by " + VersionInfo.getUser() + " from " - + VersionInfo.getBranch(); - } - - /** - * Create an instance of an interface with a constructor using a context. - * - * @param conf Configuration for the class names. - * @param context Context object to pass to the instance. - * @param contextClass Type of the context passed to the constructor. - * @param clazz Class of the object to return. - * @return New instance of the specified class that implements the desired - * interface and a single parameter constructor containing a - * StateStore reference. - */ - private static <T, R> T newInstance(final Configuration conf, - final R context, final Class<R> contextClass, final Class<T> clazz) { - try { - if (contextClass == null) { - // Default constructor if no context - Constructor<T> constructor = clazz.getConstructor(); - return constructor.newInstance(); - } else { - // Constructor with context - Constructor<T> constructor = clazz.getConstructor( - Configuration.class, contextClass); - return constructor.newInstance(conf, context); - } - } catch (ReflectiveOperationException e) { - LOG.error("Could not instantiate: {}", clazz.getSimpleName(), e); - return null; - } - } - - /** - * Creates an instance of a FileSubclusterResolver from the configuration. - * - * @param conf Configuration that defines the file resolver class. - * @param router Router service. - * @return New file subcluster resolver. - */ - public static FileSubclusterResolver newFileSubclusterResolver( - Configuration conf, Router router) { - Class<? extends FileSubclusterResolver> clazz = conf.getClass( - DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS, - DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS_DEFAULT, - FileSubclusterResolver.class); - return newInstance(conf, router, Router.class, clazz); - } - - /** - * Creates an instance of an ActiveNamenodeResolver from the configuration. - * - * @param conf Configuration that defines the namenode resolver class. - * @param stateStore State store passed to class constructor. - * @return New active namenode resolver. - */ - public static ActiveNamenodeResolver newActiveNamenodeResolver( - Configuration conf, StateStoreService stateStore) { - Class<? extends ActiveNamenodeResolver> clazz = conf.getClass( - DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS, - DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS_DEFAULT, - ActiveNamenodeResolver.class); - return newInstance(conf, stateStore, StateStoreService.class, clazz); - } - - /** - * Check if the given path is the child of parent path. - * @param path Path to be check. - * @param parent Parent path. - * @return True if parent path is parent entry for given path. - */ - public static boolean isParentEntry(final String path, final String parent) { - if (!path.startsWith(parent)) { - return false; - } - - if (path.equals(parent)) { - return true; - } - - return path.charAt(parent.length()) == Path.SEPARATOR_CHAR - || parent.equals(Path.SEPARATOR); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/4aa34324/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java deleted file mode 100644 index 7d69a26..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java +++ /dev/null @@ -1,361 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.URI; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ha.HAServiceProtocol; -import org.apache.hadoop.ha.HAServiceStatus; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.NameNodeProxies; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; -import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; -import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; -import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; -import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; -import org.apache.hadoop.hdfs.tools.NNHAServiceTarget; -import org.codehaus.jettison.json.JSONArray; -import org.codehaus.jettison.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The {@link Router} periodically checks the state of a Namenode (usually on - * the same server) and reports their high availability (HA) state and - * load/space status to the - * {@link org.apache.hadoop.hdfs.server.federation.store.StateStoreService} - * . Note that this is an optional role as a Router can be independent of any - * subcluster. - * <p> - * For performance with Namenode HA, the Router uses the high availability state - * information in the State Store to forward the request to the Namenode that is - * most likely to be active. - * <p> - * Note that this service can be embedded into the Namenode itself to simplify - * the operation. - */ -public class NamenodeHeartbeatService extends PeriodicService { - - private static final Logger LOG = - LoggerFactory.getLogger(NamenodeHeartbeatService.class); - - - /** Configuration for the heartbeat. */ - private Configuration conf; - - /** Router performing the heartbeating. */ - private final ActiveNamenodeResolver resolver; - - /** Interface to the tracked NN. */ - private final String nameserviceId; - private final String namenodeId; - - /** Namenode HA target. */ - private NNHAServiceTarget localTarget; - /** RPC address for the namenode. */ - private String rpcAddress; - /** Service RPC address for the namenode. */ - private String serviceAddress; - /** Service RPC address for the namenode. */ - private String lifelineAddress; - /** HTTP address for the namenode. */ - private String webAddress; - - /** - * Create a new Namenode status updater. - * @param resolver Namenode resolver service to handle NN registration. - * @param nsId Identifier of the nameservice. - * @param nnId Identifier of the namenode in HA. - */ - public NamenodeHeartbeatService( - ActiveNamenodeResolver resolver, String nsId, String nnId) { - super(NamenodeHeartbeatService.class.getSimpleName() + - (nsId == null ? "" : " " + nsId) + - (nnId == null ? "" : " " + nnId)); - - this.resolver = resolver; - - this.nameserviceId = nsId; - this.namenodeId = nnId; - - } - - @Override - protected void serviceInit(Configuration configuration) throws Exception { - - this.conf = configuration; - - String nnDesc = nameserviceId; - if (this.namenodeId != null && !this.namenodeId.isEmpty()) { - this.localTarget = new NNHAServiceTarget( - conf, nameserviceId, namenodeId); - nnDesc += "-" + namenodeId; - } else { - this.localTarget = null; - } - - // Get the RPC address for the clients to connect - this.rpcAddress = getRpcAddress(conf, nameserviceId, namenodeId); - LOG.info("{} RPC address: {}", nnDesc, rpcAddress); - - // Get the Service RPC address for monitoring - this.serviceAddress = - DFSUtil.getNamenodeServiceAddr(conf, nameserviceId, namenodeId); - if (this.serviceAddress == null) { - LOG.error("Cannot locate RPC service address for NN {}, " + - "using RPC address {}", nnDesc, this.rpcAddress); - this.serviceAddress = this.rpcAddress; - } - LOG.info("{} Service RPC address: {}", nnDesc, serviceAddress); - - // Get the Lifeline RPC address for faster monitoring - this.lifelineAddress = - DFSUtil.getNamenodeLifelineAddr(conf, nameserviceId, namenodeId); - if (this.lifelineAddress == null) { - this.lifelineAddress = this.serviceAddress; - } - LOG.info("{} Lifeline RPC address: {}", nnDesc, lifelineAddress); - - // Get the Web address for UI - this.webAddress = - DFSUtil.getNamenodeWebAddr(conf, nameserviceId, namenodeId); - LOG.info("{} Web address: {}", nnDesc, webAddress); - - this.setIntervalMs(conf.getLong( - DFS_ROUTER_HEARTBEAT_INTERVAL_MS, - DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT)); - - - super.serviceInit(configuration); - } - - @Override - public void periodicInvoke() { - updateState(); - } - - /** - * Get the RPC address for a Namenode. - * @param conf Configuration. - * @param nsId Name service identifier. - * @param nnId Name node identifier. - * @return RPC address in format hostname:1234. - */ - private static String getRpcAddress( - Configuration conf, String nsId, String nnId) { - - // Get it from the regular RPC setting - String confKey = DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; - String ret = conf.get(confKey); - - if (nsId != null || nnId != null) { - // Get if for the proper nameservice and namenode - confKey = DFSUtil.addKeySuffixes(confKey, nsId, nnId); - ret = conf.get(confKey); - - // If not available, get it from the map - if (ret == null) { - Map<String, InetSocketAddress> rpcAddresses = - DFSUtil.getRpcAddressesForNameserviceId(conf, nsId, null); - InetSocketAddress sockAddr = null; - if (nnId != null) { - sockAddr = rpcAddresses.get(nnId); - } else if (rpcAddresses.size() == 1) { - // Get the only namenode in the namespace - sockAddr = rpcAddresses.values().iterator().next(); - } - if (sockAddr != null) { - InetAddress addr = sockAddr.getAddress(); - ret = addr.getHostName() + ":" + sockAddr.getPort(); - } - } - } - return ret; - } - - /** - * Update the state of the Namenode. - */ - private void updateState() { - NamenodeStatusReport report = getNamenodeStatusReport(); - if (!report.registrationValid()) { - // Not operational - LOG.error("Namenode is not operational: {}", getNamenodeDesc()); - } else if (report.haStateValid()) { - // block and HA status available - LOG.debug("Received service state: {} from HA namenode: {}", - report.getState(), getNamenodeDesc()); - } else if (localTarget == null) { - // block info available, HA status not expected - LOG.debug( - "Reporting non-HA namenode as operational: " + getNamenodeDesc()); - } else { - // block info available, HA status should be available, but was not - // fetched do nothing and let the current state stand - return; - } - try { - if (!resolver.registerNamenode(report)) { - LOG.warn("Cannot register namenode {}", report); - } - } catch (IOException e) { - LOG.info("Cannot register namenode in the State Store"); - } catch (Exception ex) { - LOG.error("Unhandled exception updating NN registration for {}", - getNamenodeDesc(), ex); - } - } - - /** - * Get the status report for the Namenode monitored by this heartbeater. - * @return Namenode status report. - */ - protected NamenodeStatusReport getNamenodeStatusReport() { - NamenodeStatusReport report = new NamenodeStatusReport(nameserviceId, - namenodeId, rpcAddress, serviceAddress, lifelineAddress, webAddress); - - try { - LOG.debug("Probing NN at service address: {}", serviceAddress); - - URI serviceURI = new URI("hdfs://" + serviceAddress); - // Read the filesystem info from RPC (required) - NamenodeProtocol nn = NameNodeProxies - .createProxy(this.conf, serviceURI, NamenodeProtocol.class) - .getProxy(); - - if (nn != null) { - NamespaceInfo info = nn.versionRequest(); - if (info != null) { - report.setNamespaceInfo(info); - } - } - if (!report.registrationValid()) { - return report; - } - - // Check for safemode from the client protocol. Currently optional, but - // should be required at some point for QoS - try { - ClientProtocol client = NameNodeProxies - .createProxy(this.conf, serviceURI, ClientProtocol.class) - .getProxy(); - if (client != null) { - boolean isSafeMode = client.setSafeMode( - SafeModeAction.SAFEMODE_GET, false); - report.setSafeMode(isSafeMode); - } - } catch (Exception e) { - LOG.error("Cannot fetch safemode state for {}", getNamenodeDesc(), e); - } - - // Read the stats from JMX (optional) - updateJMXParameters(webAddress, report); - - if (localTarget != null) { - // Try to get the HA status - try { - // Determine if NN is active - // TODO: dynamic timeout - HAServiceProtocol haProtocol = localTarget.getProxy(conf, 30*1000); - HAServiceStatus status = haProtocol.getServiceStatus(); - report.setHAServiceState(status.getState()); - } catch (Throwable e) { - if (e.getMessage().startsWith("HA for namenode is not enabled")) { - LOG.error("HA for {} is not enabled", getNamenodeDesc()); - localTarget = null; - } else { - // Failed to fetch HA status, ignoring failure - LOG.error("Cannot fetch HA status for {}: {}", - getNamenodeDesc(), e.getMessage(), e); - } - } - } - } catch(IOException e) { - LOG.error("Cannot communicate with {}: {}", - getNamenodeDesc(), e.getMessage()); - } catch(Throwable e) { - // Generic error that we don't know about - LOG.error("Unexpected exception while communicating with {}: {}", - getNamenodeDesc(), e.getMessage(), e); - } - return report; - } - - /** - * Get the description of the Namenode to monitor. - * @return Description of the Namenode to monitor. - */ - public String getNamenodeDesc() { - if (namenodeId != null && !namenodeId.isEmpty()) { - return nameserviceId + "-" + namenodeId + ":" + serviceAddress; - } else { - return nameserviceId + ":" + serviceAddress; - } - } - - /** - * Get the parameters for a Namenode from JMX and add them to the report. - * @param address Web interface of the Namenode to monitor. - * @param report Namenode status report to update with JMX data. - */ - private void updateJMXParameters( - String address, NamenodeStatusReport report) { - try { - // TODO part of this should be moved to its own utility - String query = "Hadoop:service=NameNode,name=FSNamesystem*"; - JSONArray aux = FederationUtil.getJmx(query, address); - if (aux != null) { - for (int i = 0; i < aux.length(); i++) { - JSONObject jsonObject = aux.getJSONObject(i); - String name = jsonObject.getString("name"); - if (name.equals("Hadoop:service=NameNode,name=FSNamesystemState")) { - report.setDatanodeInfo( - jsonObject.getInt("NumLiveDataNodes"), - jsonObject.getInt("NumDeadDataNodes"), - jsonObject.getInt("NumDecommissioningDataNodes"), - jsonObject.getInt("NumDecomLiveDataNodes"), - jsonObject.getInt("NumDecomDeadDataNodes")); - } else if (name.equals( - "Hadoop:service=NameNode,name=FSNamesystem")) { - report.setNamesystemInfo( - jsonObject.getLong("CapacityRemaining"), - jsonObject.getLong("CapacityTotal"), - jsonObject.getLong("FilesTotal"), - jsonObject.getLong("BlocksTotal"), - jsonObject.getLong("MissingBlocks"), - jsonObject.getLong("PendingReplicationBlocks"), - jsonObject.getLong("UnderReplicatedBlocks"), - jsonObject.getLong("PendingDeletionBlocks")); - } - } - } - } catch (Exception e) { - LOG.error("Cannot get stat from {} using JMX", getNamenodeDesc(), e); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/4aa34324/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java deleted file mode 100644 index 5e12222..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java +++ /dev/null @@ -1,198 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.service.ServiceStateException; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -/** - * Service to periodically execute a runnable. - */ -public abstract class PeriodicService extends AbstractService { - - private static final Logger LOG = - LoggerFactory.getLogger(PeriodicService.class); - - /** Default interval in milliseconds for the periodic service. */ - private static final long DEFAULT_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1); - - - /** Interval for running the periodic service in milliseconds. */ - private long intervalMs; - /** Name of the service. */ - private final String serviceName; - - /** Scheduler for the periodic service. */ - private final ScheduledExecutorService scheduler; - - /** If the service is running. */ - private volatile boolean isRunning = false; - - /** How many times we run. */ - private long runCount; - /** How many errors we got. */ - private long errorCount; - /** When was the last time we executed this service successfully. */ - private long lastRun; - - /** - * Create a new periodic update service. - * - * @param name Name of the service. - */ - public PeriodicService(String name) { - this(name, DEFAULT_INTERVAL_MS); - } - - /** - * Create a new periodic update service. - * - * @param name Name of the service. - * @param interval Interval for the periodic service in milliseconds. - */ - public PeriodicService(String name, long interval) { - super(name); - this.serviceName = name; - this.intervalMs = interval; - - ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setNameFormat(this.getName() + "-%d") - .build(); - this.scheduler = Executors.newScheduledThreadPool(1, threadFactory); - } - - /** - * Set the interval for the periodic service. - * - * @param interval Interval in milliseconds. - */ - protected void setIntervalMs(long interval) { - if (getServiceState() == STATE.STARTED) { - throw new ServiceStateException("Periodic service already started"); - } else { - this.intervalMs = interval; - } - } - - /** - * Get the interval for the periodic service. - * - * @return Interval in milliseconds. - */ - protected long getIntervalMs() { - return this.intervalMs; - } - - /** - * Get how many times we failed to run the periodic service. - * - * @return Times we failed to run the periodic service. - */ - protected long getErrorCount() { - return this.errorCount; - } - - /** - * Get how many times we run the periodic service. - * - * @return Times we run the periodic service. - */ - protected long getRunCount() { - return this.runCount; - } - - /** - * Get the last time the periodic service was executed. - * - * @return Last time the periodic service was executed. - */ - protected long getLastUpdate() { - return this.lastRun; - } - - @Override - protected void serviceStart() throws Exception { - super.serviceStart(); - LOG.info("Starting periodic service {}", this.serviceName); - startPeriodic(); - } - - @Override - protected void serviceStop() throws Exception { - stopPeriodic(); - LOG.info("Stopping periodic service {}", this.serviceName); - super.serviceStop(); - } - - /** - * Stop the periodic task. - */ - protected synchronized void stopPeriodic() { - if (this.isRunning) { - LOG.info("{} is shutting down", this.serviceName); - this.isRunning = false; - this.scheduler.shutdownNow(); - } - } - - /** - * Start the periodic execution. - */ - protected synchronized void startPeriodic() { - stopPeriodic(); - - // Create the runnable service - Runnable updateRunnable = new Runnable() { - @Override - public void run() { - LOG.debug("Running {} update task", serviceName); - try { - if (!isRunning) { - return; - } - periodicInvoke(); - runCount++; - lastRun = Time.now(); - } catch (Exception ex) { - errorCount++; - LOG.warn(serviceName + " service threw an exception", ex); - } - } - }; - - // Start the execution of the periodic service - this.isRunning = true; - this.scheduler.scheduleWithFixedDelay( - updateRunnable, 0, this.intervalMs, TimeUnit.MILLISECONDS); - } - - /** - * Method that the service will run periodically. - */ - protected abstract void periodicInvoke(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/4aa34324/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java deleted file mode 100644 index dbb6ffa..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java +++ /dev/null @@ -1,208 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import java.io.IOException; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.fs.QuotaUsage; -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; -import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Module that implements the quota relevant RPC calls - * {@link ClientProtocol#setQuota(String, long, long, StorageType)} - * and - * {@link ClientProtocol#getQuotaUsage(String)} - * in the {@link RouterRpcServer}. - */ -public class Quota { - private static final Logger LOG = LoggerFactory.getLogger(Quota.class); - - /** RPC server to receive client calls. */ - private final RouterRpcServer rpcServer; - /** RPC clients to connect to the Namenodes. */ - private final RouterRpcClient rpcClient; - /** Router used in RouterRpcServer. */ - private final Router router; - - public Quota(Router router, RouterRpcServer server) { - this.router = router; - this.rpcServer = server; - this.rpcClient = server.getRPCClient(); - } - - /** - * Set quota for the federation path. - * @param path Federation path. - * @param namespaceQuota Name space quota. - * @param storagespaceQuota Storage space quota. - * @param type StorageType that the space quota is intended to be set on. - * @throws IOException - */ - public void setQuota(String path, long namespaceQuota, - long storagespaceQuota, StorageType type) throws IOException { - rpcServer.checkOperation(OperationCategory.WRITE); - - // Set quota for current path and its children mount table path. - final List<RemoteLocation> locations = getQuotaRemoteLocations(path); - if (LOG.isDebugEnabled()) { - for (RemoteLocation loc : locations) { - LOG.debug("Set quota for path: nsId: {}, dest: {}.", - loc.getNameserviceId(), loc.getDest()); - } - } - - RemoteMethod method = new RemoteMethod("setQuota", - new Class<?>[] {String.class, long.class, long.class, - StorageType.class}, - new RemoteParam(), namespaceQuota, storagespaceQuota, type); - rpcClient.invokeConcurrent(locations, method, false, false); - } - - /** - * Get quota usage for the federation path. - * @param path Federation path. - * @return Aggregated quota. - * @throws IOException - */ - public QuotaUsage getQuotaUsage(String path) throws IOException { - final List<RemoteLocation> quotaLocs = getValidQuotaLocations(path); - RemoteMethod method = new RemoteMethod("getQuotaUsage", - new Class<?>[] {String.class}, new RemoteParam()); - Map<RemoteLocation, QuotaUsage> results = rpcClient.invokeConcurrent( - quotaLocs, method, true, false, QuotaUsage.class); - - return aggregateQuota(results); - } - - /** - * Get valid quota remote locations used in {@link #getQuotaUsage(String)}. - * Differentiate the method {@link #getQuotaRemoteLocations(String)}, this - * method will do some additional filtering. - * @param path Federation path. - * @return List of valid quota remote locations. - * @throws IOException - */ - private List<RemoteLocation> getValidQuotaLocations(String path) - throws IOException { - final List<RemoteLocation> locations = getQuotaRemoteLocations(path); - - // NameService -> Locations - Map<String, List<RemoteLocation>> validLocations = new HashMap<>(); - for (RemoteLocation loc : locations) { - String nsId = loc.getNameserviceId(); - List<RemoteLocation> dests = validLocations.get(nsId); - if (dests == null) { - dests = new LinkedList<>(); - dests.add(loc); - validLocations.put(nsId, dests); - } else { - // Ensure the paths in the same nameservice is different. - // Don't include parent-child paths. - boolean isChildPath = false; - for (RemoteLocation d : dests) { - if (loc.getDest().startsWith(d.getDest())) { - isChildPath = true; - break; - } - } - - if (!isChildPath) { - dests.add(loc); - } - } - } - - List<RemoteLocation> quotaLocs = new LinkedList<>(); - for (List<RemoteLocation> locs : validLocations.values()) { - quotaLocs.addAll(locs); - } - - return quotaLocs; - } - - /** - * Aggregate quota that queried from sub-clusters. - * @param results Quota query result. - * @return Aggregated Quota. - */ - private QuotaUsage aggregateQuota(Map<RemoteLocation, QuotaUsage> results) { - long nsCount = 0; - long ssCount = 0; - boolean hasQuotaUnSet = false; - - for (Map.Entry<RemoteLocation, QuotaUsage> entry : results.entrySet()) { - RemoteLocation loc = entry.getKey(); - QuotaUsage usage = entry.getValue(); - if (usage != null) { - // If quota is not set in real FileSystem, the usage - // value will return -1. - if (usage.getQuota() == -1 && usage.getSpaceQuota() == -1) { - hasQuotaUnSet = true; - } - - nsCount += usage.getFileAndDirectoryCount(); - ssCount += usage.getSpaceConsumed(); - LOG.debug( - "Get quota usage for path: nsId: {}, dest: {}," - + " nsCount: {}, ssCount: {}.", - loc.getNameserviceId(), loc.getDest(), - usage.getFileAndDirectoryCount(), usage.getSpaceConsumed()); - } - } - - QuotaUsage.Builder builder = new QuotaUsage.Builder() - .fileAndDirectoryCount(nsCount).spaceConsumed(ssCount); - if (hasQuotaUnSet) { - builder.quota(HdfsConstants.QUOTA_DONT_SET); - } - - return builder.build(); - } - - /** - * Get all quota remote locations across subclusters under given - * federation path. - * @param path Federation path. - * @return List of quota remote locations. - * @throws IOException - */ - private List<RemoteLocation> getQuotaRemoteLocations(String path) - throws IOException { - List<RemoteLocation> locations = new LinkedList<>(); - RouterQuotaManager manager = this.router.getQuotaManager(); - if (manager != null) { - Set<String> childrenPaths = manager.getPaths(path); - for (String childPath : childrenPaths) { - locations.addAll(rpcServer.getLocationsForPath(childPath, true)); - } - } - - return locations; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/4aa34324/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java deleted file mode 100644 index a90c460..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import org.apache.commons.lang.builder.HashCodeBuilder; - -/** - * Base class for objects that are unique to a namespace. - */ -public abstract class RemoteLocationContext - implements Comparable<RemoteLocationContext> { - - /** - * Returns an identifier for a unique namespace. - * - * @return Namespace identifier. - */ - public abstract String getNameserviceId(); - - /** - * Destination in this location. For example the path in a remote namespace. - * - * @return Destination in this location. - */ - public abstract String getDest(); - - @Override - public int hashCode() { - return new HashCodeBuilder(17, 31) - .append(getNameserviceId()) - .append(getDest()) - .toHashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof RemoteLocationContext) { - RemoteLocationContext other = (RemoteLocationContext) obj; - return this.getNameserviceId().equals(other.getNameserviceId()) && - this.getDest().equals(other.getDest()); - } - return false; - } - - @Override - public int compareTo(RemoteLocationContext info) { - int ret = this.getNameserviceId().compareTo(info.getNameserviceId()); - if (ret == 0) { - ret = this.getDest().compareTo(info.getDest()); - } - return ret; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/4aa34324/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java deleted file mode 100644 index cd57d45..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import java.io.IOException; -import java.lang.reflect.Method; -import java.util.Arrays; - -import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Determines the remote client protocol method and the parameter list for a - * specific location. - */ -public class RemoteMethod { - - private static final Logger LOG = LoggerFactory.getLogger(RemoteMethod.class); - - - /** List of parameters: static and dynamic values, matchings types. */ - private final Object[] params; - /** List of method parameters types, matches parameters. */ - private final Class<?>[] types; - /** String name of the ClientProtocol method. */ - private final String methodName; - - /** - * Create a method with no parameters. - * - * @param method The string name of the ClientProtocol method. - */ - public RemoteMethod(String method) { - this.params = null; - this.types = null; - this.methodName = method; - } - - /** - * Creates a remote method generator. - * - * @param method The string name of the ClientProtocol method. - * @param pTypes A list of types to use to locate the specific method. - * @param pParams A list of parameters for the method. The order of the - * parameter list must match the order and number of the types. - * Parameters are grouped into 2 categories: - * <ul> - * <li>Static parameters that are immutable across locations. - * <li>Dynamic parameters that are determined for each location by a - * RemoteParam object. To specify a dynamic parameter, pass an - * instance of RemoteParam in place of the parameter value. - * </ul> - * @throws IOException If the types and parameter lists are not valid. - */ - public RemoteMethod(String method, Class<?>[] pTypes, Object... pParams) - throws IOException { - - if (pParams.length != pTypes.length) { - throw new IOException("Invalid parameters for method " + method); - } - - this.params = pParams; - this.types = pTypes; - this.methodName = method; - } - - /** - * Get the represented java method. - * - * @return Method - * @throws IOException If the method cannot be found. - */ - public Method getMethod() throws IOException { - try { - if (types != null) { - return ClientProtocol.class.getDeclaredMethod(methodName, types); - } else { - return ClientProtocol.class.getDeclaredMethod(methodName); - } - } catch (NoSuchMethodException e) { - // Re-throw as an IOException - LOG.error("Cannot get method {} with types {}", - methodName, Arrays.toString(types), e); - throw new IOException(e); - } catch (SecurityException e) { - LOG.error("Cannot access method {} with types {}", - methodName, Arrays.toString(types), e); - throw new IOException(e); - } - } - - /** - * Get the calling types for this method. - * - * @return An array of calling types. - */ - public Class<?>[] getTypes() { - return this.types; - } - - /** - * Generate a list of parameters for this specific location using no context. - * - * @return A list of parameters for the method customized for the location. - */ - public Object[] getParams() { - return this.getParams(null); - } - - /** - * Get the name of the method. - * - * @return Name of the method. - */ - public String getMethodName() { - return this.methodName; - } - - /** - * Generate a list of parameters for this specific location. Parameters are - * grouped into 2 categories: - * <ul> - * <li>Static parameters that are immutable across locations. - * <li>Dynamic parameters that are determined for each location by a - * RemoteParam object. - * </ul> - * - * @param context The context identifying the location. - * @return A list of parameters for the method customized for the location. - */ - public Object[] getParams(RemoteLocationContext context) { - if (this.params == null) { - return new Object[] {}; - } - Object[] objList = new Object[this.params.length]; - for (int i = 0; i < this.params.length; i++) { - Object currentObj = this.params[i]; - if (currentObj instanceof RemoteParam) { - // Map the parameter using the context - RemoteParam paramGetter = (RemoteParam) currentObj; - objList[i] = paramGetter.getParameterForContext(context); - } else { - objList[i] = currentObj; - } - } - return objList; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/4aa34324/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java deleted file mode 100644 index 8816ff6..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import java.util.Map; - -/** - * A dynamically assignable parameter that is location-specific. - * <p> - * There are 2 ways this mapping is determined: - * <ul> - * <li>Default: Uses the RemoteLocationContext's destination - * <li>Map: Uses the value of the RemoteLocationContext key provided in the - * parameter map. - * </ul> - */ -public class RemoteParam { - - private final Map<? extends Object, ? extends Object> paramMap; - - /** - * Constructs a default remote parameter. Always maps the value to the - * destination of the provided RemoveLocationContext. - */ - public RemoteParam() { - this.paramMap = null; - } - - /** - * Constructs a map based remote parameter. Determines the value using the - * provided RemoteLocationContext as a key into the map. - * - * @param map Map with RemoteLocationContext keys. - */ - public RemoteParam( - Map<? extends RemoteLocationContext, ? extends Object> map) { - this.paramMap = map; - } - - /** - * Determine the appropriate value for this parameter based on the location. - * - * @param context Context identifying the location. - * @return A parameter specific to this location. - */ - public Object getParameterForContext(RemoteLocationContext context) { - if (context == null) { - return null; - } else if (this.paramMap != null) { - return this.paramMap.get(context); - } else { - // Default case - return context.getDest(); - } - } -} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
