http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java deleted file mode 100644 index 3508eab..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java +++ /dev/null @@ -1,297 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.resolver.order; - -import static org.apache.hadoop.util.Time.monotonicNow; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.net.HostAndPort; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.security.PrivilegedAction; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; -import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; -import org.apache.hadoop.hdfs.server.federation.router.Router; -import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; -import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; -import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse; -import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; -import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; -import org.apache.hadoop.ipc.RPC.Server; -import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * The local subcluster (where the writer is) should be tried first. The writer - * is defined from the RPC query received in the RPC server. - */ -public class LocalResolver implements OrderedResolver { - - private static final Logger LOG = - LoggerFactory.getLogger(LocalResolver.class); - - /** Configuration key to set the minimum time to update the local cache.*/ - public static final String MIN_UPDATE_PERIOD_KEY = - DFSConfigKeys.FEDERATION_ROUTER_PREFIX + "local-resolver.update-period"; - /** 10 seconds by default. */ - private static final long MIN_UPDATE_PERIOD_DEFAULT = - TimeUnit.SECONDS.toMillis(10); - - - /** Router service. */ - private final Router router; - /** Minimum update time. */ - private final long minUpdateTime; - - /** Node IP -> Subcluster. */ - private Map<String, String> nodeSubcluster = null; - /** Last time the subcluster map was updated. */ - private long lastUpdated; - - - public LocalResolver(final Configuration conf, final Router routerService) { - this.minUpdateTime = conf.getTimeDuration( - MIN_UPDATE_PERIOD_KEY, MIN_UPDATE_PERIOD_DEFAULT, - TimeUnit.MILLISECONDS); - this.router = routerService; - } - - /** - * Get the local name space. This relies on the RPC Server to get the address - * from the client. - * - * TODO we only support DN and NN locations, we need to add others like - * Resource Managers. - * - * @param path Path ignored by this policy. - * @param loc Federated location with multiple destinations. - * @return Local name space. Null if we don't know about this machine. - */ - @Override - public String getFirstNamespace(final String path, final PathLocation loc) { - String localSubcluster = null; - String clientAddr = getClientAddr(); - Map<String, String> nodeToSubcluster = getSubclusterMappings(); - if (nodeToSubcluster != null) { - localSubcluster = nodeToSubcluster.get(clientAddr); - if (localSubcluster != null) { - LOG.debug("Local namespace for {} is {}", clientAddr, localSubcluster); - } else { - LOG.error("Cannot get local namespace for {}", clientAddr); - } - } else { - LOG.error("Cannot get node mapping when resolving {} at {} from {}", - path, loc, clientAddr); - } - return localSubcluster; - } - - @VisibleForTesting - String getClientAddr() { - return Server.getRemoteAddress(); - } - - /** - * Get the mapping from nodes to subcluster. It gets this mapping from the - * subclusters through expensive calls (e.g., RPC) and uses caching to avoid - * too many calls. The cache might be updated asynchronously to reduce - * latency. - * - * @return Node IP -> Subcluster. - */ - @VisibleForTesting - synchronized Map<String, String> getSubclusterMappings() { - if (nodeSubcluster == null || - (monotonicNow() - lastUpdated) > minUpdateTime) { - // Fetch the mapping asynchronously - Thread updater = new Thread(new Runnable() { - @Override - public void run() { - Map<String, String> mapping = new HashMap<>(); - - Map<String, String> dnSubcluster = getDatanodesSubcluster(); - if (dnSubcluster != null) { - mapping.putAll(dnSubcluster); - } - - Map<String, String> nnSubcluster = getNamenodesSubcluster(); - if (nnSubcluster != null) { - mapping.putAll(nnSubcluster); - } - nodeSubcluster = mapping; - lastUpdated = monotonicNow(); - } - }); - updater.start(); - - // Wait until initialized - if (nodeSubcluster == null) { - try { - LOG.debug("Wait to get the mapping for the first time"); - updater.join(); - } catch (InterruptedException e) { - LOG.error("Cannot wait for the updater to finish"); - } - } - } - return nodeSubcluster; - } - - /** - * Get the Datanode mapping from the subclusters from the Namenodes. This - * needs to be done as a privileged action to use the user for the Router and - * not the one from the client in the RPC call. - * - * @return DN IP -> Subcluster. - */ - private Map<String, String> getDatanodesSubcluster() { - - final RouterRpcServer rpcServer = getRpcServer(); - if (rpcServer == null) { - LOG.error("Cannot access the Router RPC server"); - return null; - } - - Map<String, String> ret = new HashMap<>(); - try { - // We need to get the DNs as a privileged user - UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); - Map<String, DatanodeStorageReport[]> dnMap = loginUser.doAs( - new PrivilegedAction<Map<String, DatanodeStorageReport[]>>() { - @Override - public Map<String, DatanodeStorageReport[]> run() { - try { - return rpcServer.getDatanodeStorageReportMap( - DatanodeReportType.ALL); - } catch (IOException e) { - LOG.error("Cannot get the datanodes from the RPC server", e); - return null; - } - } - }); - for (Entry<String, DatanodeStorageReport[]> entry : dnMap.entrySet()) { - String nsId = entry.getKey(); - DatanodeStorageReport[] dns = entry.getValue(); - for (DatanodeStorageReport dn : dns) { - DatanodeInfo dnInfo = dn.getDatanodeInfo(); - String ipAddr = dnInfo.getIpAddr(); - ret.put(ipAddr, nsId); - } - } - } catch (IOException e) { - LOG.error("Cannot get Datanodes from the Namenodes: {}", e.getMessage()); - } - return ret; - } - - /** - * Get the Namenode mapping from the subclusters from the Membership store. As - * the Routers are usually co-located with Namenodes, we also check for the - * local address for this Router here. - * - * @return NN IP -> Subcluster. - */ - private Map<String, String> getNamenodesSubcluster() { - - final MembershipStore membershipStore = getMembershipStore(); - if (membershipStore == null) { - LOG.error("Cannot access the Membership store"); - return null; - } - - // Manage requests from this hostname (127.0.0.1) - String localIp = "127.0.0.1"; - String localHostname = localIp; - try { - localHostname = InetAddress.getLocalHost().getHostName(); - } catch (UnknownHostException e) { - LOG.error("Cannot get local host name"); - } - - Map<String, String> ret = new HashMap<>(); - try { - // Get the values from the store - GetNamenodeRegistrationsRequest request = - GetNamenodeRegistrationsRequest.newInstance(); - GetNamenodeRegistrationsResponse response = - membershipStore.getNamenodeRegistrations(request); - final List<MembershipState> nns = response.getNamenodeMemberships(); - for (MembershipState nn : nns) { - try { - String nsId = nn.getNameserviceId(); - String rpcAddress = nn.getRpcAddress(); - String hostname = HostAndPort.fromString(rpcAddress).getHostText(); - ret.put(hostname, nsId); - if (hostname.equals(localHostname)) { - ret.put(localIp, nsId); - } - - InetAddress addr = InetAddress.getByName(hostname); - String ipAddr = addr.getHostAddress(); - ret.put(ipAddr, nsId); - } catch (Exception e) { - LOG.error("Cannot get address for {}: {}", nn, e.getMessage()); - } - } - } catch (IOException ioe) { - LOG.error("Cannot get Namenodes from the State Store: {}", - ioe.getMessage()); - } - return ret; - } - - /** - * Get the Router RPC server. - * - * @return Router RPC server. Null if not possible. - */ - private RouterRpcServer getRpcServer() { - if (this.router == null) { - return null; - } - return router.getRpcServer(); - } - - /** - * Get the Membership store. - * - * @return Membership store. - */ - private MembershipStore getMembershipStore() { - StateStoreService stateStore = router.getStateStore(); - if (stateStore == null) { - return null; - } - return stateStore.getRegisteredRecordStore(MembershipStore.class); - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/OrderedResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/OrderedResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/OrderedResolver.java deleted file mode 100644 index 3a3ccf7..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/OrderedResolver.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.resolver.order; - -import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; - - -/** - * Policy that decides which should be the first location accessed given - * multiple destinations. - */ -public interface OrderedResolver { - - /** - * Get the first namespace based on this resolver approach. - * - * @param path Path to check. - * @param loc Federated location with multiple destinations. - * @return First namespace out of the locations. - */ - String getFirstNamespace(String path, PathLocation loc); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/RandomResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/RandomResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/RandomResolver.java deleted file mode 100644 index 022aa48..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/RandomResolver.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.resolver.order; - -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import java.util.Set; - -import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Order the destinations randomly. - */ -public class RandomResolver implements OrderedResolver { - - private static final Logger LOG = - LoggerFactory.getLogger(RandomResolver.class); - - - /** Random number generator. */ - private static final Random RANDOM = new Random(); - - /** - * Get a random name space from the path. - * - * @param path Path ignored by this policy. - * @param loc Federated location with multiple destinations. - * @return Random name space. - */ - public String getFirstNamespace(final String path, final PathLocation loc) { - if (loc == null) { - return null; - } - Set<String> namespaces = loc.getNamespaces(); - if (namespaces == null || namespaces.isEmpty()) { - LOG.error("Cannot get namespaces for {}", loc); - return null; - } - List<String> nssList = new ArrayList<>(namespaces); - int index = RANDOM.nextInt(nssList.size()); - return nssList.get(index); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/package-info.java deleted file mode 100644 index f90152f..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/package-info.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * A federated location can be resolved to multiple subclusters. This package - * takes care of the order in which this multiple destinations should be used. - */ [email protected] [email protected] - -package org.apache.hadoop.hdfs.server.federation.resolver.order; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/package-info.java deleted file mode 100644 index d8be9e3..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/package-info.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * The resolver package contains indepedent data resolvers used in HDFS - * federation. The data resolvers collect data from the cluster, including from - * the state store. The resolvers expose APIs used by HDFS federation to collect - * aggregated, cached data for use in Real-time request processing. The - * resolvers are perf-sensitive and are used in the flow of the - * {@link RouterRpcServer} request path. - * <p> - * The principal resolvers are: - * <ul> - * <li>{@link ActiveNamenodeResolver} Real-time interface for locating the most - * recently active NN for a nameservice. - * <li>{@link FileSubclusterResolver} Real-time interface for determining the NN - * and local file path for a given file/folder based on the global namespace - * path. - * </ul> - */ [email protected] [email protected] -package org.apache.hadoop.hdfs.server.federation.resolver; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java deleted file mode 100644 index 1d27b51..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.ipc.RPC; - -/** - * Context to track a connection in a {@link ConnectionPool}. When a client uses - * a connection, it increments a counter to mark it as active. Once the client - * is done with the connection, it decreases the counter. It also takes care of - * closing the connection once is not active. - */ -public class ConnectionContext { - - /** Client for the connection. */ - private final ProxyAndInfo<ClientProtocol> client; - /** How many threads are using this connection. */ - private int numThreads = 0; - /** If the connection is closed. */ - private boolean closed = false; - - - public ConnectionContext(ProxyAndInfo<ClientProtocol> connection) { - this.client = connection; - } - - /** - * Check if the connection is active. - * - * @return True if the connection is active. - */ - public synchronized boolean isActive() { - return this.numThreads > 0; - } - - /** - * Check if the connection is closed. - * - * @return If the connection is closed. - */ - public synchronized boolean isClosed() { - return this.closed; - } - - /** - * Check if the connection can be used. It checks if the connection is used by - * another thread or already closed. - * - * @return True if the connection can be used. - */ - public synchronized boolean isUsable() { - return !isActive() && !isClosed(); - } - - /** - * Get the connection client. - * - * @return Connection client. - */ - public synchronized ProxyAndInfo<ClientProtocol> getClient() { - this.numThreads++; - return this.client; - } - - /** - * Release this connection. If the connection was closed, close the proxy. - * Otherwise, mark the connection as not used by us anymore. - */ - public synchronized void release() { - if (--this.numThreads == 0 && this.closed) { - close(); - } - } - - /** - * We will not use this connection anymore. If it's not being used, we close - * it. Otherwise, we let release() do it once we are done with it. - */ - public synchronized void close() { - this.closed = true; - if (this.numThreads == 0) { - ClientProtocol proxy = this.client.getProxy(); - // Nobody should be using this anymore so it should close right away - RPC.stopProxy(proxy); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java deleted file mode 100644 index 594f489..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java +++ /dev/null @@ -1,436 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import java.io.IOException; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.TreeMap; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.Time; -import org.eclipse.jetty.util.ajax.JSON; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Implements a pool of connections for the {@link Router} to be able to open - * many connections to many Namenodes. - */ -public class ConnectionManager { - - private static final Logger LOG = - LoggerFactory.getLogger(ConnectionManager.class); - - /** Number of parallel new connections to create. */ - protected static final int MAX_NEW_CONNECTIONS = 100; - - /** Minimum amount of active connections: 50%. */ - protected static final float MIN_ACTIVE_RATIO = 0.5f; - - - /** Configuration for the connection manager, pool and sockets. */ - private final Configuration conf; - - /** Min number of connections per user + nn. */ - private final int minSize = 1; - /** Max number of connections per user + nn. */ - private final int maxSize; - - /** How often we close a pool for a particular user + nn. */ - private final long poolCleanupPeriodMs; - /** How often we close a connection in a pool. */ - private final long connectionCleanupPeriodMs; - - /** Map of connection pools, one pool per user + NN. */ - private final Map<ConnectionPoolId, ConnectionPool> pools; - /** Lock for accessing pools. */ - private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - private final Lock readLock = readWriteLock.readLock(); - private final Lock writeLock = readWriteLock.writeLock(); - - /** Queue for creating new connections. */ - private final BlockingQueue<ConnectionPool> creatorQueue = - new ArrayBlockingQueue<>(MAX_NEW_CONNECTIONS); - /** Create new connections asynchronously. */ - private final ConnectionCreator creator; - /** Periodic executor to remove stale connection pools. */ - private final ScheduledThreadPoolExecutor cleaner = - new ScheduledThreadPoolExecutor(1); - - /** If the connection manager is running. */ - private boolean running = false; - - - /** - * Creates a proxy client connection pool manager. - * - * @param config Configuration for the connections. - */ - public ConnectionManager(Configuration config) { - this.conf = config; - - // Configure minimum and maximum connection pools - this.maxSize = this.conf.getInt( - DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE, - DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE_DEFAULT); - - // Map with the connections indexed by UGI and Namenode - this.pools = new HashMap<>(); - - // Create connections in a thread asynchronously - this.creator = new ConnectionCreator(creatorQueue); - this.creator.setDaemon(true); - - // Cleanup periods - this.poolCleanupPeriodMs = this.conf.getLong( - DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN, - DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN_DEFAULT); - LOG.info("Cleaning connection pools every {} seconds", - TimeUnit.MILLISECONDS.toSeconds(this.poolCleanupPeriodMs)); - this.connectionCleanupPeriodMs = this.conf.getLong( - DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS, - DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT); - LOG.info("Cleaning connections every {} seconds", - TimeUnit.MILLISECONDS.toSeconds(this.connectionCleanupPeriodMs)); - } - - /** - * Start the connection manager. - */ - public void start() { - // Start the thread that creates connections asynchronously - this.creator.start(); - - // Schedule a task to remove stale connection pools and sockets - long recyleTimeMs = Math.min( - poolCleanupPeriodMs, connectionCleanupPeriodMs); - LOG.info("Cleaning every {} seconds", - TimeUnit.MILLISECONDS.toSeconds(recyleTimeMs)); - this.cleaner.scheduleAtFixedRate( - new CleanupTask(), 0, recyleTimeMs, TimeUnit.MILLISECONDS); - - // Mark the manager as running - this.running = true; - } - - /** - * Stop the connection manager by closing all the pools. - */ - public void close() { - this.creator.shutdown(); - this.cleaner.shutdown(); - this.running = false; - - writeLock.lock(); - try { - for (ConnectionPool pool : this.pools.values()) { - pool.close(); - } - this.pools.clear(); - } finally { - writeLock.unlock(); - } - } - - /** - * Fetches the next available proxy client in the pool. Each client connection - * is reserved for a single user and cannot be reused until free. - * - * @param ugi User group information. - * @param nnAddress Namenode address for the connection. - * @return Proxy client to connect to nnId as UGI. - * @throws IOException If the connection cannot be obtained. - */ - public ConnectionContext getConnection( - UserGroupInformation ugi, String nnAddress) throws IOException { - - // Check if the manager is shutdown - if (!this.running) { - LOG.error( - "Cannot get a connection to {} because the manager isn't running", - nnAddress); - return null; - } - - // Try to get the pool if created - ConnectionPoolId connectionId = new ConnectionPoolId(ugi, nnAddress); - ConnectionPool pool = null; - readLock.lock(); - try { - pool = this.pools.get(connectionId); - } finally { - readLock.unlock(); - } - - // Create the pool if not created before - if (pool == null) { - writeLock.lock(); - try { - pool = this.pools.get(connectionId); - if (pool == null) { - pool = new ConnectionPool( - this.conf, nnAddress, ugi, this.minSize, this.maxSize); - this.pools.put(connectionId, pool); - } - } finally { - writeLock.unlock(); - } - } - - ConnectionContext conn = pool.getConnection(); - - // Add a new connection to the pool if it wasn't usable - if (conn == null || !conn.isUsable()) { - if (!this.creatorQueue.offer(pool)) { - LOG.error("Cannot add more than {} connections at the same time", - MAX_NEW_CONNECTIONS); - } - } - - if (conn != null && conn.isClosed()) { - LOG.error("We got a closed connection from {}", pool); - conn = null; - } - - return conn; - } - - /** - * Get the number of connection pools. - * - * @return Number of connection pools. - */ - public int getNumConnectionPools() { - readLock.lock(); - try { - return pools.size(); - } finally { - readLock.unlock(); - } - } - - /** - * Get number of open connections. - * - * @return Number of open connections. - */ - public int getNumConnections() { - int total = 0; - readLock.lock(); - try { - for (ConnectionPool pool : this.pools.values()) { - total += pool.getNumConnections(); - } - } finally { - readLock.unlock(); - } - return total; - } - - /** - * Get number of active connections. - * - * @return Number of active connections. - */ - public int getNumActiveConnections() { - int total = 0; - readLock.lock(); - try { - for (ConnectionPool pool : this.pools.values()) { - total += pool.getNumActiveConnections(); - } - } finally { - readLock.unlock(); - } - return total; - } - - /** - * Get the number of connections to be created. - * - * @return Number of connections to be created. - */ - public int getNumCreatingConnections() { - return this.creatorQueue.size(); - } - - /** - * Get a JSON representation of the connection pool. - * - * @return JSON representation of all the connection pools. - */ - public String getJSON() { - final Map<String, String> info = new TreeMap<>(); - readLock.lock(); - try { - for (Entry<ConnectionPoolId, ConnectionPool> entry : - this.pools.entrySet()) { - ConnectionPoolId connectionPoolId = entry.getKey(); - ConnectionPool pool = entry.getValue(); - info.put(connectionPoolId.toString(), pool.getJSON()); - } - } finally { - readLock.unlock(); - } - return JSON.toString(info); - } - - @VisibleForTesting - Map<ConnectionPoolId, ConnectionPool> getPools() { - return this.pools; - } - - /** - * Clean the unused connections for this pool. - * - * @param pool Connection pool to cleanup. - */ - @VisibleForTesting - void cleanup(ConnectionPool pool) { - if (pool.getNumConnections() > pool.getMinSize()) { - // Check if the pool hasn't been active in a while or not 50% are used - long timeSinceLastActive = Time.now() - pool.getLastActiveTime(); - int total = pool.getNumConnections(); - int active = pool.getNumActiveConnections(); - if (timeSinceLastActive > connectionCleanupPeriodMs || - active < MIN_ACTIVE_RATIO * total) { - // Remove and close 1 connection - List<ConnectionContext> conns = pool.removeConnections(1); - for (ConnectionContext conn : conns) { - conn.close(); - } - LOG.debug("Removed connection {} used {} seconds ago. " + - "Pool has {}/{} connections", pool.getConnectionPoolId(), - TimeUnit.MILLISECONDS.toSeconds(timeSinceLastActive), - pool.getNumConnections(), pool.getMaxSize()); - } - } - } - - /** - * Removes stale connections not accessed recently from the pool. This is - * invoked periodically. - */ - private class CleanupTask implements Runnable { - - @Override - public void run() { - long currentTime = Time.now(); - List<ConnectionPoolId> toRemove = new LinkedList<>(); - - // Look for stale pools - readLock.lock(); - try { - for (Entry<ConnectionPoolId, ConnectionPool> entry : pools.entrySet()) { - ConnectionPool pool = entry.getValue(); - long lastTimeActive = pool.getLastActiveTime(); - boolean isStale = - currentTime > (lastTimeActive + poolCleanupPeriodMs); - if (lastTimeActive > 0 && isStale) { - // Remove this pool - LOG.debug("Closing and removing stale pool {}", pool); - pool.close(); - ConnectionPoolId poolId = entry.getKey(); - toRemove.add(poolId); - } else { - // Keep this pool but clean connections inside - LOG.debug("Cleaning up {}", pool); - cleanup(pool); - } - } - } finally { - readLock.unlock(); - } - - // Remove stale pools - if (!toRemove.isEmpty()) { - writeLock.lock(); - try { - for (ConnectionPoolId poolId : toRemove) { - pools.remove(poolId); - } - } finally { - writeLock.unlock(); - } - } - } - } - - /** - * Thread that creates connections asynchronously. - */ - private static class ConnectionCreator extends Thread { - /** If the creator is running. */ - private boolean running = true; - /** Queue to push work to. */ - private BlockingQueue<ConnectionPool> queue; - - ConnectionCreator(BlockingQueue<ConnectionPool> blockingQueue) { - super("Connection creator"); - this.queue = blockingQueue; - } - - @Override - public void run() { - while (this.running) { - try { - ConnectionPool pool = this.queue.take(); - try { - int total = pool.getNumConnections(); - int active = pool.getNumActiveConnections(); - if (pool.getNumConnections() < pool.getMaxSize() && - active >= MIN_ACTIVE_RATIO * total) { - ConnectionContext conn = pool.newConnection(); - pool.addConnection(conn); - } else { - LOG.debug("Cannot add more than {} connections to {}", - pool.getMaxSize(), pool); - } - } catch (IOException e) { - LOG.error("Cannot create a new connection", e); - } - } catch (InterruptedException e) { - LOG.error("The connection creator was interrupted"); - this.running = false; - } - } - } - - /** - * Stop this connection creator. - */ - public void shutdown() { - this.running = false; - this.interrupt(); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java deleted file mode 100644 index 5af8a86..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java +++ /dev/null @@ -1,337 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.net.SocketFactory; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB; -import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.io.retry.RetryUtils; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.SaslRpcServer; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.Time; -import org.eclipse.jetty.util.ajax.JSON; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Maintains a pool of connections for each User (including tokens) + NN. The - * RPC client maintains a single socket, to achieve throughput similar to a NN, - * each request is multiplexed across multiple sockets/connections from a - * pool. - */ [email protected] [email protected] -public class ConnectionPool { - - private static final Logger LOG = - LoggerFactory.getLogger(ConnectionPool.class); - - - /** Configuration settings for the connection pool. */ - private final Configuration conf; - - /** Identifier for this connection pool. */ - private final ConnectionPoolId connectionPoolId; - /** Namenode this pool connects to. */ - private final String namenodeAddress; - /** User for this connections. */ - private final UserGroupInformation ugi; - - /** Pool of connections. We mimic a COW array. */ - private volatile List<ConnectionContext> connections = new ArrayList<>(); - /** Connection index for round-robin. */ - private final AtomicInteger clientIndex = new AtomicInteger(0); - - /** Min number of connections per user. */ - private final int minSize; - /** Max number of connections per user. */ - private final int maxSize; - - /** The last time a connection was active. */ - private volatile long lastActiveTime = 0; - - - protected ConnectionPool(Configuration config, String address, - UserGroupInformation user, int minPoolSize, int maxPoolSize) - throws IOException { - - this.conf = config; - - // Connection pool target - this.ugi = user; - this.namenodeAddress = address; - this.connectionPoolId = - new ConnectionPoolId(this.ugi, this.namenodeAddress); - - // Set configuration parameters for the pool - this.minSize = minPoolSize; - this.maxSize = maxPoolSize; - - // Add minimum connections to the pool - for (int i=0; i<this.minSize; i++) { - ConnectionContext newConnection = newConnection(); - this.connections.add(newConnection); - } - LOG.debug("Created connection pool \"{}\" with {} connections", - this.connectionPoolId, this.minSize); - } - - /** - * Get the maximum number of connections allowed in this pool. - * - * @return Maximum number of connections. - */ - protected int getMaxSize() { - return this.maxSize; - } - - /** - * Get the minimum number of connections in this pool. - * - * @return Minimum number of connections. - */ - protected int getMinSize() { - return this.minSize; - } - - /** - * Get the connection pool identifier. - * - * @return Connection pool identifier. - */ - protected ConnectionPoolId getConnectionPoolId() { - return this.connectionPoolId; - } - - /** - * Return the next connection round-robin. - * - * @return Connection context. - */ - protected ConnectionContext getConnection() { - - this.lastActiveTime = Time.now(); - - // Get a connection from the pool following round-robin - ConnectionContext conn = null; - List<ConnectionContext> tmpConnections = this.connections; - int size = tmpConnections.size(); - int threadIndex = this.clientIndex.getAndIncrement(); - for (int i=0; i<size; i++) { - int index = (threadIndex + i) % size; - conn = tmpConnections.get(index); - if (conn != null && conn.isUsable()) { - return conn; - } - } - - // We return a connection even if it's active - return conn; - } - - /** - * Add a connection to the current pool. It uses a Copy-On-Write approach. - * - * @param conn New connection to add to the pool. - */ - public synchronized void addConnection(ConnectionContext conn) { - List<ConnectionContext> tmpConnections = new ArrayList<>(this.connections); - tmpConnections.add(conn); - this.connections = tmpConnections; - - this.lastActiveTime = Time.now(); - } - - /** - * Remove connections from the current pool. - * - * @param num Number of connections to remove. - * @return Removed connections. - */ - public synchronized List<ConnectionContext> removeConnections(int num) { - List<ConnectionContext> removed = new LinkedList<>(); - - // Remove and close the last connection - List<ConnectionContext> tmpConnections = new ArrayList<>(); - for (int i=0; i<this.connections.size(); i++) { - ConnectionContext conn = this.connections.get(i); - if (i < this.minSize || i < this.connections.size() - num) { - tmpConnections.add(conn); - } else { - removed.add(conn); - } - } - this.connections = tmpConnections; - - return removed; - } - - /** - * Close the connection pool. - */ - protected synchronized void close() { - long timeSinceLastActive = TimeUnit.MILLISECONDS.toSeconds( - Time.now() - getLastActiveTime()); - LOG.debug("Shutting down connection pool \"{}\" used {} seconds ago", - this.connectionPoolId, timeSinceLastActive); - - for (ConnectionContext connection : this.connections) { - connection.close(); - } - this.connections.clear(); - } - - /** - * Number of connections in the pool. - * - * @return Number of connections. - */ - protected int getNumConnections() { - return this.connections.size(); - } - - /** - * Number of active connections in the pool. - * - * @return Number of active connections. - */ - protected int getNumActiveConnections() { - int ret = 0; - - List<ConnectionContext> tmpConnections = this.connections; - for (ConnectionContext conn : tmpConnections) { - if (conn.isActive()) { - ret++; - } - } - return ret; - } - - /** - * Get the last time the connection pool was used. - * - * @return Last time the connection pool was used. - */ - protected long getLastActiveTime() { - return this.lastActiveTime; - } - - @Override - public String toString() { - return this.connectionPoolId.toString(); - } - - /** - * JSON representation of the connection pool. - * - * @return String representation of the JSON. - */ - public String getJSON() { - final Map<String, String> info = new LinkedHashMap<>(); - info.put("active", Integer.toString(getNumActiveConnections())); - info.put("total", Integer.toString(getNumConnections())); - if (LOG.isDebugEnabled()) { - List<ConnectionContext> tmpConnections = this.connections; - for (int i=0; i<tmpConnections.size(); i++) { - ConnectionContext connection = tmpConnections.get(i); - info.put(i + " active", Boolean.toString(connection.isActive())); - info.put(i + " closed", Boolean.toString(connection.isClosed())); - } - } - return JSON.toString(info); - } - - /** - * Create a new proxy wrapper for a client NN connection. - * @return Proxy for the target ClientProtocol that contains the user's - * security context. - * @throws IOException - */ - public ConnectionContext newConnection() throws IOException { - return newConnection(this.conf, this.namenodeAddress, this.ugi); - } - - /** - * Creates a proxy wrapper for a client NN connection. Each proxy contains - * context for a single user/security context. To maximize throughput it is - * recommended to use multiple connection per user+server, allowing multiple - * writes and reads to be dispatched in parallel. - * - * @param conf Configuration for the connection. - * @param nnAddress Address of server supporting the ClientProtocol. - * @param ugi User context. - * @return Proxy for the target ClientProtocol that contains the user's - * security context. - * @throws IOException If it cannot be created. - */ - protected static ConnectionContext newConnection(Configuration conf, - String nnAddress, UserGroupInformation ugi) - throws IOException { - RPC.setProtocolEngine( - conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class); - - final RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy( - conf, - HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY, - HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT, - HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY, - HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT, - HdfsConstants.SAFEMODE_EXCEPTION_CLASS_NAME); - - SocketFactory factory = SocketFactory.getDefault(); - if (UserGroupInformation.isSecurityEnabled()) { - SaslRpcServer.init(conf); - } - InetSocketAddress socket = NetUtils.createSocketAddr(nnAddress); - final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class); - ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy( - ClientNamenodeProtocolPB.class, version, socket, ugi, conf, - factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy(); - ClientProtocol client = new ClientNamenodeProtocolTranslatorPB(proxy); - Text dtService = SecurityUtil.buildTokenService(socket); - - ProxyAndInfo<ClientProtocol> clientProxy = - new ProxyAndInfo<ClientProtocol>(client, dtService, socket); - ConnectionContext connection = new ConnectionContext(clientProxy); - return connection; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java deleted file mode 100644 index 6e1ee9a..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java +++ /dev/null @@ -1,123 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.lang.builder.HashCodeBuilder; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; - -/** - * Identifier for a connection for a user to a namenode. - */ [email protected] [email protected] -public class ConnectionPoolId implements Comparable<ConnectionPoolId> { - - /** Namenode identifier. */ - private final String nnId; - /** Information about the user. */ - private final UserGroupInformation ugi; - - /** - * New connection pool identifier. - * - * @param ugi Information of the user issuing the request. - * @param nnId Namenode address with port. - */ - public ConnectionPoolId(final UserGroupInformation ugi, final String nnId) { - this.nnId = nnId; - this.ugi = ugi; - } - - @Override - public int hashCode() { - int hash = new HashCodeBuilder(17, 31) - .append(this.nnId) - .append(this.ugi.toString()) - .append(this.getTokenIds()) - .toHashCode(); - return hash; - } - - @Override - public boolean equals(Object o) { - if (o instanceof ConnectionPoolId) { - ConnectionPoolId other = (ConnectionPoolId) o; - if (!this.nnId.equals(other.nnId)) { - return false; - } - if (!this.ugi.toString().equals(other.ugi.toString())) { - return false; - } - String thisTokens = this.getTokenIds().toString(); - String otherTokens = other.getTokenIds().toString(); - return thisTokens.equals(otherTokens); - } - return false; - } - - @Override - public String toString() { - return this.ugi + " " + this.getTokenIds() + "->" + this.nnId; - } - - @Override - public int compareTo(ConnectionPoolId other) { - int ret = this.nnId.compareTo(other.nnId); - if (ret == 0) { - ret = this.ugi.toString().compareTo(other.ugi.toString()); - } - if (ret == 0) { - String thisTokens = this.getTokenIds().toString(); - String otherTokens = other.getTokenIds().toString(); - ret = thisTokens.compareTo(otherTokens); - } - return ret; - } - - @VisibleForTesting - UserGroupInformation getUgi() { - return this.ugi; - } - - /** - * Get the token identifiers for this connection. - * @return List with the token identifiers. - */ - private List<String> getTokenIds() { - List<String> tokenIds = new ArrayList<>(); - Collection<Token<? extends TokenIdentifier>> tokens = this.ugi.getTokens(); - for (Token<? extends TokenIdentifier> token : tokens) { - byte[] tokenIdBytes = token.getIdentifier(); - String tokenId = Arrays.toString(tokenIdBytes); - tokenIds.add(tokenId); - } - Collections.sort(tokenIds); - return tokenIds; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/DFSRouter.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/DFSRouter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/DFSRouter.java deleted file mode 100644 index a2ac258..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/DFSRouter.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import static org.apache.hadoop.util.ExitUtil.terminate; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.service.CompositeService.CompositeServiceShutdownHook; -import org.apache.hadoop.util.ShutdownHookManager; -import org.apache.hadoop.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Tool to start the {@link Router} for Router-based federation. - */ -public final class DFSRouter { - - private static final Logger LOG = LoggerFactory.getLogger(DFSRouter.class); - - - /** Usage string for help message. */ - private static final String USAGE = "Usage: hdfs dfsrouter"; - - /** Priority of the Router shutdown hook. */ - public static final int SHUTDOWN_HOOK_PRIORITY = 30; - - - private DFSRouter() { - // This is just a class to trigger the Router - } - - /** - * Main run loop for the router. - * - * @param argv parameters. - */ - public static void main(String[] argv) { - if (DFSUtil.parseHelpArgument(argv, USAGE, System.out, true)) { - System.exit(0); - } - - try { - StringUtils.startupShutdownMessage(Router.class, argv, LOG); - - Router router = new Router(); - - ShutdownHookManager.get().addShutdownHook( - new CompositeServiceShutdownHook(router), SHUTDOWN_HOOK_PRIORITY); - - Configuration conf = new HdfsConfiguration(); - router.init(conf); - router.start(); - } catch (Throwable e) { - LOG.error("Failed to start router", e); - terminate(1, e); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java deleted file mode 100644 index d2b2d50..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java +++ /dev/null @@ -1,198 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import static org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer.merge; - -import java.io.IOException; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; -import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats; -import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; -import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo; -import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; -import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; -import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; -import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; - -/** - * Module that implements all the RPC calls in - * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol} related to - * Erasure Coding in the {@link RouterRpcServer}. - */ -public class ErasureCoding { - - /** RPC server to receive client calls. */ - private final RouterRpcServer rpcServer; - /** RPC clients to connect to the Namenodes. */ - private final RouterRpcClient rpcClient; - /** Interface to identify the active NN for a nameservice or blockpool ID. */ - private final ActiveNamenodeResolver namenodeResolver; - - - public ErasureCoding(RouterRpcServer server) { - this.rpcServer = server; - this.rpcClient = this.rpcServer.getRPCClient(); - this.namenodeResolver = this.rpcClient.getNamenodeResolver(); - } - - public ErasureCodingPolicyInfo[] getErasureCodingPolicies() - throws IOException { - rpcServer.checkOperation(OperationCategory.READ); - - RemoteMethod method = new RemoteMethod("getErasureCodingPolicies"); - Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); - Map<FederationNamespaceInfo, ErasureCodingPolicyInfo[]> ret = - rpcClient.invokeConcurrent( - nss, method, true, false, ErasureCodingPolicyInfo[].class); - return merge(ret, ErasureCodingPolicyInfo.class); - } - - public Map<String, String> getErasureCodingCodecs() throws IOException { - rpcServer.checkOperation(OperationCategory.READ); - - RemoteMethod method = new RemoteMethod("getErasureCodingCodecs"); - Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); - @SuppressWarnings("rawtypes") - Map<FederationNamespaceInfo, Map> retCodecs = - rpcClient.invokeConcurrent( - nss, method, true, false, Map.class); - - Map<String, String> ret = new HashMap<>(); - Object obj = retCodecs; - @SuppressWarnings("unchecked") - Map<FederationNamespaceInfo, Map<String, String>> results = - (Map<FederationNamespaceInfo, Map<String, String>>)obj; - Collection<Map<String, String>> allCodecs = results.values(); - for (Map<String, String> codecs : allCodecs) { - ret.putAll(codecs); - } - - return ret; - } - - public AddErasureCodingPolicyResponse[] addErasureCodingPolicies( - ErasureCodingPolicy[] policies) throws IOException { - rpcServer.checkOperation(OperationCategory.WRITE); - - RemoteMethod method = new RemoteMethod("addErasureCodingPolicies", - new Class<?>[] {ErasureCodingPolicy[].class}, new Object[] {policies}); - Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); - Map<FederationNamespaceInfo, AddErasureCodingPolicyResponse[]> ret = - rpcClient.invokeConcurrent( - nss, method, true, false, AddErasureCodingPolicyResponse[].class); - - return merge(ret, AddErasureCodingPolicyResponse.class); - } - - public void removeErasureCodingPolicy(String ecPolicyName) - throws IOException { - rpcServer.checkOperation(OperationCategory.WRITE); - - RemoteMethod method = new RemoteMethod("removeErasureCodingPolicy", - new Class<?>[] {String.class}, ecPolicyName); - Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); - rpcClient.invokeConcurrent(nss, method, true, false); - } - - public void disableErasureCodingPolicy(String ecPolicyName) - throws IOException { - rpcServer.checkOperation(OperationCategory.WRITE); - - RemoteMethod method = new RemoteMethod("disableErasureCodingPolicy", - new Class<?>[] {String.class}, ecPolicyName); - Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); - rpcClient.invokeConcurrent(nss, method, true, false); - } - - public void enableErasureCodingPolicy(String ecPolicyName) - throws IOException { - rpcServer.checkOperation(OperationCategory.WRITE); - - RemoteMethod method = new RemoteMethod("enableErasureCodingPolicy", - new Class<?>[] {String.class}, ecPolicyName); - Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); - rpcClient.invokeConcurrent(nss, method, true, false); - } - - public ErasureCodingPolicy getErasureCodingPolicy(String src) - throws IOException { - rpcServer.checkOperation(OperationCategory.READ); - - final List<RemoteLocation> locations = - rpcServer.getLocationsForPath(src, true); - RemoteMethod remoteMethod = new RemoteMethod("getErasureCodingPolicy", - new Class<?>[] {String.class}, new RemoteParam()); - ErasureCodingPolicy ret = rpcClient.invokeSequential( - locations, remoteMethod, null, null); - return ret; - } - - public void setErasureCodingPolicy(String src, String ecPolicyName) - throws IOException { - rpcServer.checkOperation(OperationCategory.WRITE); - - final List<RemoteLocation> locations = - rpcServer.getLocationsForPath(src, true); - RemoteMethod remoteMethod = new RemoteMethod("setErasureCodingPolicy", - new Class<?>[] {String.class, String.class}, - new RemoteParam(), ecPolicyName); - rpcClient.invokeSequential(locations, remoteMethod, null, null); - } - - public void unsetErasureCodingPolicy(String src) throws IOException { - rpcServer.checkOperation(OperationCategory.WRITE); - - final List<RemoteLocation> locations = - rpcServer.getLocationsForPath(src, true); - RemoteMethod remoteMethod = new RemoteMethod("unsetErasureCodingPolicy", - new Class<?>[] {String.class}, new RemoteParam()); - rpcClient.invokeSequential(locations, remoteMethod, null, null); - } - - public ECBlockGroupStats getECBlockGroupStats() throws IOException { - rpcServer.checkOperation(OperationCategory.READ); - - RemoteMethod method = new RemoteMethod("getECBlockGroupStats"); - Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); - Map<FederationNamespaceInfo, ECBlockGroupStats> allStats = - rpcClient.invokeConcurrent( - nss, method, true, false, ECBlockGroupStats.class); - - // Merge the stats from all the namespaces - long lowRedundancyBlockGroups = 0; - long corruptBlockGroups = 0; - long missingBlockGroups = 0; - long bytesInFutureBlockGroups = 0; - long pendingDeletionBlocks = 0; - for (ECBlockGroupStats stats : allStats.values()) { - lowRedundancyBlockGroups += stats.getLowRedundancyBlockGroups(); - corruptBlockGroups += stats.getCorruptBlockGroups(); - missingBlockGroups += stats.getMissingBlockGroups(); - bytesInFutureBlockGroups += stats.getBytesInFutureBlockGroups(); - pendingDeletionBlocks += stats.getPendingDeletionBlocks(); - } - return new ECBlockGroupStats(lowRedundancyBlockGroups, corruptBlockGroups, - missingBlockGroups, bytesInFutureBlockGroups, pendingDeletionBlocks); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java deleted file mode 100644 index 3dfd998..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java +++ /dev/null @@ -1,209 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.lang.reflect.Constructor; -import java.net.URL; -import java.net.URLConnection; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; -import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; -import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; -import org.apache.hadoop.util.VersionInfo; -import org.codehaus.jettison.json.JSONArray; -import org.codehaus.jettison.json.JSONException; -import org.codehaus.jettison.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Utilities for managing HDFS federation. - */ -public final class FederationUtil { - - private static final Logger LOG = - LoggerFactory.getLogger(FederationUtil.class); - - private FederationUtil() { - // Utility Class - } - - /** - * Get a JMX data from a web endpoint. - * - * @param beanQuery JMX bean. - * @param webAddress Web address of the JMX endpoint. - * @return JSON with the JMX data - */ - public static JSONArray getJmx(String beanQuery, String webAddress) { - JSONArray ret = null; - BufferedReader reader = null; - try { - String host = webAddress; - int port = -1; - if (webAddress.indexOf(":") > 0) { - String[] webAddressSplit = webAddress.split(":"); - host = webAddressSplit[0]; - port = Integer.parseInt(webAddressSplit[1]); - } - URL jmxURL = new URL("http", host, port, "/jmx?qry=" + beanQuery); - URLConnection conn = jmxURL.openConnection(); - conn.setConnectTimeout(5 * 1000); - conn.setReadTimeout(5 * 1000); - InputStream in = conn.getInputStream(); - InputStreamReader isr = new InputStreamReader(in, "UTF-8"); - reader = new BufferedReader(isr); - - StringBuilder sb = new StringBuilder(); - String line = null; - while ((line = reader.readLine()) != null) { - sb.append(line); - } - String jmxOutput = sb.toString(); - - // Parse JSON - JSONObject json = new JSONObject(jmxOutput); - ret = json.getJSONArray("beans"); - } catch (IOException e) { - LOG.error("Cannot read JMX bean {} from server {}: {}", - beanQuery, webAddress, e.getMessage()); - } catch (JSONException e) { - LOG.error("Cannot parse JMX output for {} from server {}: {}", - beanQuery, webAddress, e.getMessage()); - } catch (Exception e) { - LOG.error("Cannot parse JMX output for {} from server {}: {}", - beanQuery, webAddress, e); - } finally { - if (reader != null) { - try { - reader.close(); - } catch (IOException e) { - LOG.error("Problem closing {}", webAddress, e); - } - } - } - return ret; - } - - /** - * Fetch the Hadoop version string for this jar. - * - * @return Hadoop version string, e.g., 3.0.1. - */ - public static String getVersion() { - return VersionInfo.getVersion(); - } - - /** - * Fetch the build/compile information for this jar. - * - * @return String Compilation info. - */ - public static String getCompileInfo() { - return VersionInfo.getDate() + " by " + VersionInfo.getUser() + " from " - + VersionInfo.getBranch(); - } - - /** - * Create an instance of an interface with a constructor using a context. - * - * @param conf Configuration for the class names. - * @param context Context object to pass to the instance. - * @param contextClass Type of the context passed to the constructor. - * @param clazz Class of the object to return. - * @return New instance of the specified class that implements the desired - * interface and a single parameter constructor containing a - * StateStore reference. - */ - private static <T, R> T newInstance(final Configuration conf, - final R context, final Class<R> contextClass, final Class<T> clazz) { - try { - if (contextClass == null) { - // Default constructor if no context - Constructor<T> constructor = clazz.getConstructor(); - return constructor.newInstance(); - } else { - // Constructor with context - Constructor<T> constructor = clazz.getConstructor( - Configuration.class, contextClass); - return constructor.newInstance(conf, context); - } - } catch (ReflectiveOperationException e) { - LOG.error("Could not instantiate: {}", clazz.getSimpleName(), e); - return null; - } - } - - /** - * Creates an instance of a FileSubclusterResolver from the configuration. - * - * @param conf Configuration that defines the file resolver class. - * @param router Router service. - * @return New file subcluster resolver. - */ - public static FileSubclusterResolver newFileSubclusterResolver( - Configuration conf, Router router) { - Class<? extends FileSubclusterResolver> clazz = conf.getClass( - DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS, - DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS_DEFAULT, - FileSubclusterResolver.class); - return newInstance(conf, router, Router.class, clazz); - } - - /** - * Creates an instance of an ActiveNamenodeResolver from the configuration. - * - * @param conf Configuration that defines the namenode resolver class. - * @param stateStore State store passed to class constructor. - * @return New active namenode resolver. - */ - public static ActiveNamenodeResolver newActiveNamenodeResolver( - Configuration conf, StateStoreService stateStore) { - Class<? extends ActiveNamenodeResolver> clazz = conf.getClass( - DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS, - DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS_DEFAULT, - ActiveNamenodeResolver.class); - return newInstance(conf, stateStore, StateStoreService.class, clazz); - } - - /** - * Check if the given path is the child of parent path. - * @param path Path to be check. - * @param parent Parent path. - * @return True if parent path is parent entry for given path. - */ - public static boolean isParentEntry(final String path, final String parent) { - if (!path.startsWith(parent)) { - return false; - } - - if (path.equals(parent)) { - return true; - } - - return path.charAt(parent.length()) == Path.SEPARATOR_CHAR - || parent.equals(Path.SEPARATOR); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
