http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java new file mode 100644 index 0000000..4d76c89 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java @@ -0,0 +1,297 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver.order; + +import static org.apache.hadoop.util.Time.monotonicNow; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.net.HostAndPort; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.security.PrivilegedAction; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; +import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.ipc.RPC.Server; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The local subcluster (where the writer is) should be tried first. The writer + * is defined from the RPC query received in the RPC server. + */ +public class LocalResolver implements OrderedResolver { + + private static final Logger LOG = + LoggerFactory.getLogger(LocalResolver.class); + + /** Configuration key to set the minimum time to update the local cache.*/ + public static final String MIN_UPDATE_PERIOD_KEY = + RBFConfigKeys.FEDERATION_ROUTER_PREFIX + "local-resolver.update-period"; + /** 10 seconds by default. */ + private static final long MIN_UPDATE_PERIOD_DEFAULT = + TimeUnit.SECONDS.toMillis(10); + + + /** Router service. */ + private final Router router; + /** Minimum update time. */ + private final long minUpdateTime; + + /** Node IP -> Subcluster. */ + private Map<String, String> nodeSubcluster = null; + /** Last time the subcluster map was updated. */ + private long lastUpdated; + + + public LocalResolver(final Configuration conf, final Router routerService) { + this.minUpdateTime = conf.getTimeDuration( + MIN_UPDATE_PERIOD_KEY, MIN_UPDATE_PERIOD_DEFAULT, + TimeUnit.MILLISECONDS); + this.router = routerService; + } + + /** + * Get the local name space. This relies on the RPC Server to get the address + * from the client. + * + * TODO we only support DN and NN locations, we need to add others like + * Resource Managers. + * + * @param path Path ignored by this policy. + * @param loc Federated location with multiple destinations. + * @return Local name space. Null if we don't know about this machine. + */ + @Override + public String getFirstNamespace(final String path, final PathLocation loc) { + String localSubcluster = null; + String clientAddr = getClientAddr(); + Map<String, String> nodeToSubcluster = getSubclusterMappings(); + if (nodeToSubcluster != null) { + localSubcluster = nodeToSubcluster.get(clientAddr); + if (localSubcluster != null) { + LOG.debug("Local namespace for {} is {}", clientAddr, localSubcluster); + } else { + LOG.error("Cannot get local namespace for {}", clientAddr); + } + } else { + LOG.error("Cannot get node mapping when resolving {} at {} from {}", + path, loc, clientAddr); + } + return localSubcluster; + } + + @VisibleForTesting + String getClientAddr() { + return Server.getRemoteAddress(); + } + + /** + * Get the mapping from nodes to subcluster. It gets this mapping from the + * subclusters through expensive calls (e.g., RPC) and uses caching to avoid + * too many calls. The cache might be updated asynchronously to reduce + * latency. + * + * @return Node IP -> Subcluster. + */ + @VisibleForTesting + synchronized Map<String, String> getSubclusterMappings() { + if (nodeSubcluster == null || + (monotonicNow() - lastUpdated) > minUpdateTime) { + // Fetch the mapping asynchronously + Thread updater = new Thread(new Runnable() { + @Override + public void run() { + Map<String, String> mapping = new HashMap<>(); + + Map<String, String> dnSubcluster = getDatanodesSubcluster(); + if (dnSubcluster != null) { + mapping.putAll(dnSubcluster); + } + + Map<String, String> nnSubcluster = getNamenodesSubcluster(); + if (nnSubcluster != null) { + mapping.putAll(nnSubcluster); + } + nodeSubcluster = mapping; + lastUpdated = monotonicNow(); + } + }); + updater.start(); + + // Wait until initialized + if (nodeSubcluster == null) { + try { + LOG.debug("Wait to get the mapping for the first time"); + updater.join(); + } catch (InterruptedException e) { + LOG.error("Cannot wait for the updater to finish"); + } + } + } + return nodeSubcluster; + } + + /** + * Get the Datanode mapping from the subclusters from the Namenodes. This + * needs to be done as a privileged action to use the user for the Router and + * not the one from the client in the RPC call. + * + * @return DN IP -> Subcluster. + */ + private Map<String, String> getDatanodesSubcluster() { + + final RouterRpcServer rpcServer = getRpcServer(); + if (rpcServer == null) { + LOG.error("Cannot access the Router RPC server"); + return null; + } + + Map<String, String> ret = new HashMap<>(); + try { + // We need to get the DNs as a privileged user + UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); + Map<String, DatanodeStorageReport[]> dnMap = loginUser.doAs( + new PrivilegedAction<Map<String, DatanodeStorageReport[]>>() { + @Override + public Map<String, DatanodeStorageReport[]> run() { + try { + return rpcServer.getDatanodeStorageReportMap( + DatanodeReportType.ALL); + } catch (IOException e) { + LOG.error("Cannot get the datanodes from the RPC server", e); + return null; + } + } + }); + for (Entry<String, DatanodeStorageReport[]> entry : dnMap.entrySet()) { + String nsId = entry.getKey(); + DatanodeStorageReport[] dns = entry.getValue(); + for (DatanodeStorageReport dn : dns) { + DatanodeInfo dnInfo = dn.getDatanodeInfo(); + String ipAddr = dnInfo.getIpAddr(); + ret.put(ipAddr, nsId); + } + } + } catch (IOException e) { + LOG.error("Cannot get Datanodes from the Namenodes: {}", e.getMessage()); + } + return ret; + } + + /** + * Get the Namenode mapping from the subclusters from the Membership store. As + * the Routers are usually co-located with Namenodes, we also check for the + * local address for this Router here. + * + * @return NN IP -> Subcluster. + */ + private Map<String, String> getNamenodesSubcluster() { + + final MembershipStore membershipStore = getMembershipStore(); + if (membershipStore == null) { + LOG.error("Cannot access the Membership store"); + return null; + } + + // Manage requests from this hostname (127.0.0.1) + String localIp = "127.0.0.1"; + String localHostname = localIp; + try { + localHostname = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + LOG.error("Cannot get local host name"); + } + + Map<String, String> ret = new HashMap<>(); + try { + // Get the values from the store + GetNamenodeRegistrationsRequest request = + GetNamenodeRegistrationsRequest.newInstance(); + GetNamenodeRegistrationsResponse response = + membershipStore.getNamenodeRegistrations(request); + final List<MembershipState> nns = response.getNamenodeMemberships(); + for (MembershipState nn : nns) { + try { + String nsId = nn.getNameserviceId(); + String rpcAddress = nn.getRpcAddress(); + String hostname = HostAndPort.fromString(rpcAddress).getHostText(); + ret.put(hostname, nsId); + if (hostname.equals(localHostname)) { + ret.put(localIp, nsId); + } + + InetAddress addr = InetAddress.getByName(hostname); + String ipAddr = addr.getHostAddress(); + ret.put(ipAddr, nsId); + } catch (Exception e) { + LOG.error("Cannot get address for {}: {}", nn, e.getMessage()); + } + } + } catch (IOException ioe) { + LOG.error("Cannot get Namenodes from the State Store: {}", + ioe.getMessage()); + } + return ret; + } + + /** + * Get the Router RPC server. + * + * @return Router RPC server. Null if not possible. + */ + private RouterRpcServer getRpcServer() { + if (this.router == null) { + return null; + } + return router.getRpcServer(); + } + + /** + * Get the Membership store. + * + * @return Membership store. + */ + private MembershipStore getMembershipStore() { + StateStoreService stateStore = router.getStateStore(); + if (stateStore == null) { + return null; + } + return stateStore.getRegisteredRecordStore(MembershipStore.class); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/OrderedResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/OrderedResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/OrderedResolver.java new file mode 100644 index 0000000..3a3ccf7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/OrderedResolver.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver.order; + +import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; + + +/** + * Policy that decides which should be the first location accessed given + * multiple destinations. + */ +public interface OrderedResolver { + + /** + * Get the first namespace based on this resolver approach. + * + * @param path Path to check. + * @param loc Federated location with multiple destinations. + * @return First namespace out of the locations. + */ + String getFirstNamespace(String path, PathLocation loc); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/RandomResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/RandomResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/RandomResolver.java new file mode 100644 index 0000000..022aa48 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/RandomResolver.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver.order; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.Set; + +import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Order the destinations randomly. + */ +public class RandomResolver implements OrderedResolver { + + private static final Logger LOG = + LoggerFactory.getLogger(RandomResolver.class); + + + /** Random number generator. */ + private static final Random RANDOM = new Random(); + + /** + * Get a random name space from the path. + * + * @param path Path ignored by this policy. + * @param loc Federated location with multiple destinations. + * @return Random name space. + */ + public String getFirstNamespace(final String path, final PathLocation loc) { + if (loc == null) { + return null; + } + Set<String> namespaces = loc.getNamespaces(); + if (namespaces == null || namespaces.isEmpty()) { + LOG.error("Cannot get namespaces for {}", loc); + return null; + } + List<String> nssList = new ArrayList<>(namespaces); + int index = RANDOM.nextInt(nssList.size()); + return nssList.get(index); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/package-info.java new file mode 100644 index 0000000..f90152f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/package-info.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * A federated location can be resolved to multiple subclusters. This package + * takes care of the order in which this multiple destinations should be used. + */ [email protected] [email protected] + +package org.apache.hadoop.hdfs.server.federation.resolver.order; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/package-info.java new file mode 100644 index 0000000..d8be9e3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/package-info.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * The resolver package contains indepedent data resolvers used in HDFS + * federation. The data resolvers collect data from the cluster, including from + * the state store. The resolvers expose APIs used by HDFS federation to collect + * aggregated, cached data for use in Real-time request processing. The + * resolvers are perf-sensitive and are used in the flow of the + * {@link RouterRpcServer} request path. + * <p> + * The principal resolvers are: + * <ul> + * <li>{@link ActiveNamenodeResolver} Real-time interface for locating the most + * recently active NN for a nameservice. + * <li>{@link FileSubclusterResolver} Real-time interface for determining the NN + * and local file path for a given file/folder based on the global namespace + * path. + * </ul> + */ [email protected] [email protected] +package org.apache.hadoop.hdfs.server.federation.resolver; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java new file mode 100644 index 0000000..1d27b51 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.ipc.RPC; + +/** + * Context to track a connection in a {@link ConnectionPool}. When a client uses + * a connection, it increments a counter to mark it as active. Once the client + * is done with the connection, it decreases the counter. It also takes care of + * closing the connection once is not active. + */ +public class ConnectionContext { + + /** Client for the connection. */ + private final ProxyAndInfo<ClientProtocol> client; + /** How many threads are using this connection. */ + private int numThreads = 0; + /** If the connection is closed. */ + private boolean closed = false; + + + public ConnectionContext(ProxyAndInfo<ClientProtocol> connection) { + this.client = connection; + } + + /** + * Check if the connection is active. + * + * @return True if the connection is active. + */ + public synchronized boolean isActive() { + return this.numThreads > 0; + } + + /** + * Check if the connection is closed. + * + * @return If the connection is closed. + */ + public synchronized boolean isClosed() { + return this.closed; + } + + /** + * Check if the connection can be used. It checks if the connection is used by + * another thread or already closed. + * + * @return True if the connection can be used. + */ + public synchronized boolean isUsable() { + return !isActive() && !isClosed(); + } + + /** + * Get the connection client. + * + * @return Connection client. + */ + public synchronized ProxyAndInfo<ClientProtocol> getClient() { + this.numThreads++; + return this.client; + } + + /** + * Release this connection. If the connection was closed, close the proxy. + * Otherwise, mark the connection as not used by us anymore. + */ + public synchronized void release() { + if (--this.numThreads == 0 && this.closed) { + close(); + } + } + + /** + * We will not use this connection anymore. If it's not being used, we close + * it. Otherwise, we let release() do it once we are done with it. + */ + public synchronized void close() { + this.closed = true; + if (this.numThreads == 0) { + ClientProtocol proxy = this.client.getProxy(); + // Nobody should be using this anymore so it should close right away + RPC.stopProxy(proxy); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java new file mode 100644 index 0000000..97c6403 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java @@ -0,0 +1,435 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.TreeMap; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Time; +import org.eclipse.jetty.util.ajax.JSON; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implements a pool of connections for the {@link Router} to be able to open + * many connections to many Namenodes. + */ +public class ConnectionManager { + + private static final Logger LOG = + LoggerFactory.getLogger(ConnectionManager.class); + + /** Number of parallel new connections to create. */ + protected static final int MAX_NEW_CONNECTIONS = 100; + + /** Minimum amount of active connections: 50%. */ + protected static final float MIN_ACTIVE_RATIO = 0.5f; + + + /** Configuration for the connection manager, pool and sockets. */ + private final Configuration conf; + + /** Min number of connections per user + nn. */ + private final int minSize = 1; + /** Max number of connections per user + nn. */ + private final int maxSize; + + /** How often we close a pool for a particular user + nn. */ + private final long poolCleanupPeriodMs; + /** How often we close a connection in a pool. */ + private final long connectionCleanupPeriodMs; + + /** Map of connection pools, one pool per user + NN. */ + private final Map<ConnectionPoolId, ConnectionPool> pools; + /** Lock for accessing pools. */ + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final Lock readLock = readWriteLock.readLock(); + private final Lock writeLock = readWriteLock.writeLock(); + + /** Queue for creating new connections. */ + private final BlockingQueue<ConnectionPool> creatorQueue = + new ArrayBlockingQueue<>(MAX_NEW_CONNECTIONS); + /** Create new connections asynchronously. */ + private final ConnectionCreator creator; + /** Periodic executor to remove stale connection pools. */ + private final ScheduledThreadPoolExecutor cleaner = + new ScheduledThreadPoolExecutor(1); + + /** If the connection manager is running. */ + private boolean running = false; + + + /** + * Creates a proxy client connection pool manager. + * + * @param config Configuration for the connections. + */ + public ConnectionManager(Configuration config) { + this.conf = config; + + // Configure minimum and maximum connection pools + this.maxSize = this.conf.getInt( + RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE, + RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE_DEFAULT); + + // Map with the connections indexed by UGI and Namenode + this.pools = new HashMap<>(); + + // Create connections in a thread asynchronously + this.creator = new ConnectionCreator(creatorQueue); + this.creator.setDaemon(true); + + // Cleanup periods + this.poolCleanupPeriodMs = this.conf.getLong( + RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN, + RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN_DEFAULT); + LOG.info("Cleaning connection pools every {} seconds", + TimeUnit.MILLISECONDS.toSeconds(this.poolCleanupPeriodMs)); + this.connectionCleanupPeriodMs = this.conf.getLong( + RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS, + RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT); + LOG.info("Cleaning connections every {} seconds", + TimeUnit.MILLISECONDS.toSeconds(this.connectionCleanupPeriodMs)); + } + + /** + * Start the connection manager. + */ + public void start() { + // Start the thread that creates connections asynchronously + this.creator.start(); + + // Schedule a task to remove stale connection pools and sockets + long recyleTimeMs = Math.min( + poolCleanupPeriodMs, connectionCleanupPeriodMs); + LOG.info("Cleaning every {} seconds", + TimeUnit.MILLISECONDS.toSeconds(recyleTimeMs)); + this.cleaner.scheduleAtFixedRate( + new CleanupTask(), 0, recyleTimeMs, TimeUnit.MILLISECONDS); + + // Mark the manager as running + this.running = true; + } + + /** + * Stop the connection manager by closing all the pools. + */ + public void close() { + this.creator.shutdown(); + this.cleaner.shutdown(); + this.running = false; + + writeLock.lock(); + try { + for (ConnectionPool pool : this.pools.values()) { + pool.close(); + } + this.pools.clear(); + } finally { + writeLock.unlock(); + } + } + + /** + * Fetches the next available proxy client in the pool. Each client connection + * is reserved for a single user and cannot be reused until free. + * + * @param ugi User group information. + * @param nnAddress Namenode address for the connection. + * @return Proxy client to connect to nnId as UGI. + * @throws IOException If the connection cannot be obtained. + */ + public ConnectionContext getConnection( + UserGroupInformation ugi, String nnAddress) throws IOException { + + // Check if the manager is shutdown + if (!this.running) { + LOG.error( + "Cannot get a connection to {} because the manager isn't running", + nnAddress); + return null; + } + + // Try to get the pool if created + ConnectionPoolId connectionId = new ConnectionPoolId(ugi, nnAddress); + ConnectionPool pool = null; + readLock.lock(); + try { + pool = this.pools.get(connectionId); + } finally { + readLock.unlock(); + } + + // Create the pool if not created before + if (pool == null) { + writeLock.lock(); + try { + pool = this.pools.get(connectionId); + if (pool == null) { + pool = new ConnectionPool( + this.conf, nnAddress, ugi, this.minSize, this.maxSize); + this.pools.put(connectionId, pool); + } + } finally { + writeLock.unlock(); + } + } + + ConnectionContext conn = pool.getConnection(); + + // Add a new connection to the pool if it wasn't usable + if (conn == null || !conn.isUsable()) { + if (!this.creatorQueue.offer(pool)) { + LOG.error("Cannot add more than {} connections at the same time", + MAX_NEW_CONNECTIONS); + } + } + + if (conn != null && conn.isClosed()) { + LOG.error("We got a closed connection from {}", pool); + conn = null; + } + + return conn; + } + + /** + * Get the number of connection pools. + * + * @return Number of connection pools. + */ + public int getNumConnectionPools() { + readLock.lock(); + try { + return pools.size(); + } finally { + readLock.unlock(); + } + } + + /** + * Get number of open connections. + * + * @return Number of open connections. + */ + public int getNumConnections() { + int total = 0; + readLock.lock(); + try { + for (ConnectionPool pool : this.pools.values()) { + total += pool.getNumConnections(); + } + } finally { + readLock.unlock(); + } + return total; + } + + /** + * Get number of active connections. + * + * @return Number of active connections. + */ + public int getNumActiveConnections() { + int total = 0; + readLock.lock(); + try { + for (ConnectionPool pool : this.pools.values()) { + total += pool.getNumActiveConnections(); + } + } finally { + readLock.unlock(); + } + return total; + } + + /** + * Get the number of connections to be created. + * + * @return Number of connections to be created. + */ + public int getNumCreatingConnections() { + return this.creatorQueue.size(); + } + + /** + * Get a JSON representation of the connection pool. + * + * @return JSON representation of all the connection pools. + */ + public String getJSON() { + final Map<String, String> info = new TreeMap<>(); + readLock.lock(); + try { + for (Entry<ConnectionPoolId, ConnectionPool> entry : + this.pools.entrySet()) { + ConnectionPoolId connectionPoolId = entry.getKey(); + ConnectionPool pool = entry.getValue(); + info.put(connectionPoolId.toString(), pool.getJSON()); + } + } finally { + readLock.unlock(); + } + return JSON.toString(info); + } + + @VisibleForTesting + Map<ConnectionPoolId, ConnectionPool> getPools() { + return this.pools; + } + + /** + * Clean the unused connections for this pool. + * + * @param pool Connection pool to cleanup. + */ + @VisibleForTesting + void cleanup(ConnectionPool pool) { + if (pool.getNumConnections() > pool.getMinSize()) { + // Check if the pool hasn't been active in a while or not 50% are used + long timeSinceLastActive = Time.now() - pool.getLastActiveTime(); + int total = pool.getNumConnections(); + int active = pool.getNumActiveConnections(); + if (timeSinceLastActive > connectionCleanupPeriodMs || + active < MIN_ACTIVE_RATIO * total) { + // Remove and close 1 connection + List<ConnectionContext> conns = pool.removeConnections(1); + for (ConnectionContext conn : conns) { + conn.close(); + } + LOG.debug("Removed connection {} used {} seconds ago. " + + "Pool has {}/{} connections", pool.getConnectionPoolId(), + TimeUnit.MILLISECONDS.toSeconds(timeSinceLastActive), + pool.getNumConnections(), pool.getMaxSize()); + } + } + } + + /** + * Removes stale connections not accessed recently from the pool. This is + * invoked periodically. + */ + private class CleanupTask implements Runnable { + + @Override + public void run() { + long currentTime = Time.now(); + List<ConnectionPoolId> toRemove = new LinkedList<>(); + + // Look for stale pools + readLock.lock(); + try { + for (Entry<ConnectionPoolId, ConnectionPool> entry : pools.entrySet()) { + ConnectionPool pool = entry.getValue(); + long lastTimeActive = pool.getLastActiveTime(); + boolean isStale = + currentTime > (lastTimeActive + poolCleanupPeriodMs); + if (lastTimeActive > 0 && isStale) { + // Remove this pool + LOG.debug("Closing and removing stale pool {}", pool); + pool.close(); + ConnectionPoolId poolId = entry.getKey(); + toRemove.add(poolId); + } else { + // Keep this pool but clean connections inside + LOG.debug("Cleaning up {}", pool); + cleanup(pool); + } + } + } finally { + readLock.unlock(); + } + + // Remove stale pools + if (!toRemove.isEmpty()) { + writeLock.lock(); + try { + for (ConnectionPoolId poolId : toRemove) { + pools.remove(poolId); + } + } finally { + writeLock.unlock(); + } + } + } + } + + /** + * Thread that creates connections asynchronously. + */ + private static class ConnectionCreator extends Thread { + /** If the creator is running. */ + private boolean running = true; + /** Queue to push work to. */ + private BlockingQueue<ConnectionPool> queue; + + ConnectionCreator(BlockingQueue<ConnectionPool> blockingQueue) { + super("Connection creator"); + this.queue = blockingQueue; + } + + @Override + public void run() { + while (this.running) { + try { + ConnectionPool pool = this.queue.take(); + try { + int total = pool.getNumConnections(); + int active = pool.getNumActiveConnections(); + if (pool.getNumConnections() < pool.getMaxSize() && + active >= MIN_ACTIVE_RATIO * total) { + ConnectionContext conn = pool.newConnection(); + pool.addConnection(conn); + } else { + LOG.debug("Cannot add more than {} connections to {}", + pool.getMaxSize(), pool); + } + } catch (IOException e) { + LOG.error("Cannot create a new connection", e); + } + } catch (InterruptedException e) { + LOG.error("The connection creator was interrupted"); + this.running = false; + } + } + } + + /** + * Stop this connection creator. + */ + public void shutdown() { + this.running = false; + this.interrupt(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java new file mode 100644 index 0000000..5af8a86 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java @@ -0,0 +1,337 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.net.SocketFactory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryUtils; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SaslRpcServer; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Time; +import org.eclipse.jetty.util.ajax.JSON; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Maintains a pool of connections for each User (including tokens) + NN. The + * RPC client maintains a single socket, to achieve throughput similar to a NN, + * each request is multiplexed across multiple sockets/connections from a + * pool. + */ [email protected] [email protected] +public class ConnectionPool { + + private static final Logger LOG = + LoggerFactory.getLogger(ConnectionPool.class); + + + /** Configuration settings for the connection pool. */ + private final Configuration conf; + + /** Identifier for this connection pool. */ + private final ConnectionPoolId connectionPoolId; + /** Namenode this pool connects to. */ + private final String namenodeAddress; + /** User for this connections. */ + private final UserGroupInformation ugi; + + /** Pool of connections. We mimic a COW array. */ + private volatile List<ConnectionContext> connections = new ArrayList<>(); + /** Connection index for round-robin. */ + private final AtomicInteger clientIndex = new AtomicInteger(0); + + /** Min number of connections per user. */ + private final int minSize; + /** Max number of connections per user. */ + private final int maxSize; + + /** The last time a connection was active. */ + private volatile long lastActiveTime = 0; + + + protected ConnectionPool(Configuration config, String address, + UserGroupInformation user, int minPoolSize, int maxPoolSize) + throws IOException { + + this.conf = config; + + // Connection pool target + this.ugi = user; + this.namenodeAddress = address; + this.connectionPoolId = + new ConnectionPoolId(this.ugi, this.namenodeAddress); + + // Set configuration parameters for the pool + this.minSize = minPoolSize; + this.maxSize = maxPoolSize; + + // Add minimum connections to the pool + for (int i=0; i<this.minSize; i++) { + ConnectionContext newConnection = newConnection(); + this.connections.add(newConnection); + } + LOG.debug("Created connection pool \"{}\" with {} connections", + this.connectionPoolId, this.minSize); + } + + /** + * Get the maximum number of connections allowed in this pool. + * + * @return Maximum number of connections. + */ + protected int getMaxSize() { + return this.maxSize; + } + + /** + * Get the minimum number of connections in this pool. + * + * @return Minimum number of connections. + */ + protected int getMinSize() { + return this.minSize; + } + + /** + * Get the connection pool identifier. + * + * @return Connection pool identifier. + */ + protected ConnectionPoolId getConnectionPoolId() { + return this.connectionPoolId; + } + + /** + * Return the next connection round-robin. + * + * @return Connection context. + */ + protected ConnectionContext getConnection() { + + this.lastActiveTime = Time.now(); + + // Get a connection from the pool following round-robin + ConnectionContext conn = null; + List<ConnectionContext> tmpConnections = this.connections; + int size = tmpConnections.size(); + int threadIndex = this.clientIndex.getAndIncrement(); + for (int i=0; i<size; i++) { + int index = (threadIndex + i) % size; + conn = tmpConnections.get(index); + if (conn != null && conn.isUsable()) { + return conn; + } + } + + // We return a connection even if it's active + return conn; + } + + /** + * Add a connection to the current pool. It uses a Copy-On-Write approach. + * + * @param conn New connection to add to the pool. + */ + public synchronized void addConnection(ConnectionContext conn) { + List<ConnectionContext> tmpConnections = new ArrayList<>(this.connections); + tmpConnections.add(conn); + this.connections = tmpConnections; + + this.lastActiveTime = Time.now(); + } + + /** + * Remove connections from the current pool. + * + * @param num Number of connections to remove. + * @return Removed connections. + */ + public synchronized List<ConnectionContext> removeConnections(int num) { + List<ConnectionContext> removed = new LinkedList<>(); + + // Remove and close the last connection + List<ConnectionContext> tmpConnections = new ArrayList<>(); + for (int i=0; i<this.connections.size(); i++) { + ConnectionContext conn = this.connections.get(i); + if (i < this.minSize || i < this.connections.size() - num) { + tmpConnections.add(conn); + } else { + removed.add(conn); + } + } + this.connections = tmpConnections; + + return removed; + } + + /** + * Close the connection pool. + */ + protected synchronized void close() { + long timeSinceLastActive = TimeUnit.MILLISECONDS.toSeconds( + Time.now() - getLastActiveTime()); + LOG.debug("Shutting down connection pool \"{}\" used {} seconds ago", + this.connectionPoolId, timeSinceLastActive); + + for (ConnectionContext connection : this.connections) { + connection.close(); + } + this.connections.clear(); + } + + /** + * Number of connections in the pool. + * + * @return Number of connections. + */ + protected int getNumConnections() { + return this.connections.size(); + } + + /** + * Number of active connections in the pool. + * + * @return Number of active connections. + */ + protected int getNumActiveConnections() { + int ret = 0; + + List<ConnectionContext> tmpConnections = this.connections; + for (ConnectionContext conn : tmpConnections) { + if (conn.isActive()) { + ret++; + } + } + return ret; + } + + /** + * Get the last time the connection pool was used. + * + * @return Last time the connection pool was used. + */ + protected long getLastActiveTime() { + return this.lastActiveTime; + } + + @Override + public String toString() { + return this.connectionPoolId.toString(); + } + + /** + * JSON representation of the connection pool. + * + * @return String representation of the JSON. + */ + public String getJSON() { + final Map<String, String> info = new LinkedHashMap<>(); + info.put("active", Integer.toString(getNumActiveConnections())); + info.put("total", Integer.toString(getNumConnections())); + if (LOG.isDebugEnabled()) { + List<ConnectionContext> tmpConnections = this.connections; + for (int i=0; i<tmpConnections.size(); i++) { + ConnectionContext connection = tmpConnections.get(i); + info.put(i + " active", Boolean.toString(connection.isActive())); + info.put(i + " closed", Boolean.toString(connection.isClosed())); + } + } + return JSON.toString(info); + } + + /** + * Create a new proxy wrapper for a client NN connection. + * @return Proxy for the target ClientProtocol that contains the user's + * security context. + * @throws IOException + */ + public ConnectionContext newConnection() throws IOException { + return newConnection(this.conf, this.namenodeAddress, this.ugi); + } + + /** + * Creates a proxy wrapper for a client NN connection. Each proxy contains + * context for a single user/security context. To maximize throughput it is + * recommended to use multiple connection per user+server, allowing multiple + * writes and reads to be dispatched in parallel. + * + * @param conf Configuration for the connection. + * @param nnAddress Address of server supporting the ClientProtocol. + * @param ugi User context. + * @return Proxy for the target ClientProtocol that contains the user's + * security context. + * @throws IOException If it cannot be created. + */ + protected static ConnectionContext newConnection(Configuration conf, + String nnAddress, UserGroupInformation ugi) + throws IOException { + RPC.setProtocolEngine( + conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class); + + final RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy( + conf, + HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY, + HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT, + HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY, + HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT, + HdfsConstants.SAFEMODE_EXCEPTION_CLASS_NAME); + + SocketFactory factory = SocketFactory.getDefault(); + if (UserGroupInformation.isSecurityEnabled()) { + SaslRpcServer.init(conf); + } + InetSocketAddress socket = NetUtils.createSocketAddr(nnAddress); + final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class); + ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy( + ClientNamenodeProtocolPB.class, version, socket, ugi, conf, + factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy(); + ClientProtocol client = new ClientNamenodeProtocolTranslatorPB(proxy); + Text dtService = SecurityUtil.buildTokenService(socket); + + ProxyAndInfo<ClientProtocol> clientProxy = + new ProxyAndInfo<ClientProtocol>(client, dtService, socket); + ConnectionContext connection = new ConnectionContext(clientProxy); + return connection; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java new file mode 100644 index 0000000..6e1ee9a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; + +/** + * Identifier for a connection for a user to a namenode. + */ [email protected] [email protected] +public class ConnectionPoolId implements Comparable<ConnectionPoolId> { + + /** Namenode identifier. */ + private final String nnId; + /** Information about the user. */ + private final UserGroupInformation ugi; + + /** + * New connection pool identifier. + * + * @param ugi Information of the user issuing the request. + * @param nnId Namenode address with port. + */ + public ConnectionPoolId(final UserGroupInformation ugi, final String nnId) { + this.nnId = nnId; + this.ugi = ugi; + } + + @Override + public int hashCode() { + int hash = new HashCodeBuilder(17, 31) + .append(this.nnId) + .append(this.ugi.toString()) + .append(this.getTokenIds()) + .toHashCode(); + return hash; + } + + @Override + public boolean equals(Object o) { + if (o instanceof ConnectionPoolId) { + ConnectionPoolId other = (ConnectionPoolId) o; + if (!this.nnId.equals(other.nnId)) { + return false; + } + if (!this.ugi.toString().equals(other.ugi.toString())) { + return false; + } + String thisTokens = this.getTokenIds().toString(); + String otherTokens = other.getTokenIds().toString(); + return thisTokens.equals(otherTokens); + } + return false; + } + + @Override + public String toString() { + return this.ugi + " " + this.getTokenIds() + "->" + this.nnId; + } + + @Override + public int compareTo(ConnectionPoolId other) { + int ret = this.nnId.compareTo(other.nnId); + if (ret == 0) { + ret = this.ugi.toString().compareTo(other.ugi.toString()); + } + if (ret == 0) { + String thisTokens = this.getTokenIds().toString(); + String otherTokens = other.getTokenIds().toString(); + ret = thisTokens.compareTo(otherTokens); + } + return ret; + } + + @VisibleForTesting + UserGroupInformation getUgi() { + return this.ugi; + } + + /** + * Get the token identifiers for this connection. + * @return List with the token identifiers. + */ + private List<String> getTokenIds() { + List<String> tokenIds = new ArrayList<>(); + Collection<Token<? extends TokenIdentifier>> tokens = this.ugi.getTokens(); + for (Token<? extends TokenIdentifier> token : tokens) { + byte[] tokenIdBytes = token.getIdentifier(); + String tokenId = Arrays.toString(tokenIdBytes); + tokenIds.add(tokenId); + } + Collections.sort(tokenIds); + return tokenIds; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/DFSRouter.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/DFSRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/DFSRouter.java new file mode 100644 index 0000000..a2ac258 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/DFSRouter.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.util.ExitUtil.terminate; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.service.CompositeService.CompositeServiceShutdownHook; +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tool to start the {@link Router} for Router-based federation. + */ +public final class DFSRouter { + + private static final Logger LOG = LoggerFactory.getLogger(DFSRouter.class); + + + /** Usage string for help message. */ + private static final String USAGE = "Usage: hdfs dfsrouter"; + + /** Priority of the Router shutdown hook. */ + public static final int SHUTDOWN_HOOK_PRIORITY = 30; + + + private DFSRouter() { + // This is just a class to trigger the Router + } + + /** + * Main run loop for the router. + * + * @param argv parameters. + */ + public static void main(String[] argv) { + if (DFSUtil.parseHelpArgument(argv, USAGE, System.out, true)) { + System.exit(0); + } + + try { + StringUtils.startupShutdownMessage(Router.class, argv, LOG); + + Router router = new Router(); + + ShutdownHookManager.get().addShutdownHook( + new CompositeServiceShutdownHook(router), SHUTDOWN_HOOK_PRIORITY); + + Configuration conf = new HdfsConfiguration(); + router.init(conf); + router.start(); + } catch (Throwable e) { + LOG.error("Failed to start router", e); + terminate(1, e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java new file mode 100644 index 0000000..d2b2d50 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer.merge; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; +import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; + +/** + * Module that implements all the RPC calls in + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol} related to + * Erasure Coding in the {@link RouterRpcServer}. + */ +public class ErasureCoding { + + /** RPC server to receive client calls. */ + private final RouterRpcServer rpcServer; + /** RPC clients to connect to the Namenodes. */ + private final RouterRpcClient rpcClient; + /** Interface to identify the active NN for a nameservice or blockpool ID. */ + private final ActiveNamenodeResolver namenodeResolver; + + + public ErasureCoding(RouterRpcServer server) { + this.rpcServer = server; + this.rpcClient = this.rpcServer.getRPCClient(); + this.namenodeResolver = this.rpcClient.getNamenodeResolver(); + } + + public ErasureCodingPolicyInfo[] getErasureCodingPolicies() + throws IOException { + rpcServer.checkOperation(OperationCategory.READ); + + RemoteMethod method = new RemoteMethod("getErasureCodingPolicies"); + Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); + Map<FederationNamespaceInfo, ErasureCodingPolicyInfo[]> ret = + rpcClient.invokeConcurrent( + nss, method, true, false, ErasureCodingPolicyInfo[].class); + return merge(ret, ErasureCodingPolicyInfo.class); + } + + public Map<String, String> getErasureCodingCodecs() throws IOException { + rpcServer.checkOperation(OperationCategory.READ); + + RemoteMethod method = new RemoteMethod("getErasureCodingCodecs"); + Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); + @SuppressWarnings("rawtypes") + Map<FederationNamespaceInfo, Map> retCodecs = + rpcClient.invokeConcurrent( + nss, method, true, false, Map.class); + + Map<String, String> ret = new HashMap<>(); + Object obj = retCodecs; + @SuppressWarnings("unchecked") + Map<FederationNamespaceInfo, Map<String, String>> results = + (Map<FederationNamespaceInfo, Map<String, String>>)obj; + Collection<Map<String, String>> allCodecs = results.values(); + for (Map<String, String> codecs : allCodecs) { + ret.putAll(codecs); + } + + return ret; + } + + public AddErasureCodingPolicyResponse[] addErasureCodingPolicies( + ErasureCodingPolicy[] policies) throws IOException { + rpcServer.checkOperation(OperationCategory.WRITE); + + RemoteMethod method = new RemoteMethod("addErasureCodingPolicies", + new Class<?>[] {ErasureCodingPolicy[].class}, new Object[] {policies}); + Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); + Map<FederationNamespaceInfo, AddErasureCodingPolicyResponse[]> ret = + rpcClient.invokeConcurrent( + nss, method, true, false, AddErasureCodingPolicyResponse[].class); + + return merge(ret, AddErasureCodingPolicyResponse.class); + } + + public void removeErasureCodingPolicy(String ecPolicyName) + throws IOException { + rpcServer.checkOperation(OperationCategory.WRITE); + + RemoteMethod method = new RemoteMethod("removeErasureCodingPolicy", + new Class<?>[] {String.class}, ecPolicyName); + Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent(nss, method, true, false); + } + + public void disableErasureCodingPolicy(String ecPolicyName) + throws IOException { + rpcServer.checkOperation(OperationCategory.WRITE); + + RemoteMethod method = new RemoteMethod("disableErasureCodingPolicy", + new Class<?>[] {String.class}, ecPolicyName); + Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent(nss, method, true, false); + } + + public void enableErasureCodingPolicy(String ecPolicyName) + throws IOException { + rpcServer.checkOperation(OperationCategory.WRITE); + + RemoteMethod method = new RemoteMethod("enableErasureCodingPolicy", + new Class<?>[] {String.class}, ecPolicyName); + Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent(nss, method, true, false); + } + + public ErasureCodingPolicy getErasureCodingPolicy(String src) + throws IOException { + rpcServer.checkOperation(OperationCategory.READ); + + final List<RemoteLocation> locations = + rpcServer.getLocationsForPath(src, true); + RemoteMethod remoteMethod = new RemoteMethod("getErasureCodingPolicy", + new Class<?>[] {String.class}, new RemoteParam()); + ErasureCodingPolicy ret = rpcClient.invokeSequential( + locations, remoteMethod, null, null); + return ret; + } + + public void setErasureCodingPolicy(String src, String ecPolicyName) + throws IOException { + rpcServer.checkOperation(OperationCategory.WRITE); + + final List<RemoteLocation> locations = + rpcServer.getLocationsForPath(src, true); + RemoteMethod remoteMethod = new RemoteMethod("setErasureCodingPolicy", + new Class<?>[] {String.class, String.class}, + new RemoteParam(), ecPolicyName); + rpcClient.invokeSequential(locations, remoteMethod, null, null); + } + + public void unsetErasureCodingPolicy(String src) throws IOException { + rpcServer.checkOperation(OperationCategory.WRITE); + + final List<RemoteLocation> locations = + rpcServer.getLocationsForPath(src, true); + RemoteMethod remoteMethod = new RemoteMethod("unsetErasureCodingPolicy", + new Class<?>[] {String.class}, new RemoteParam()); + rpcClient.invokeSequential(locations, remoteMethod, null, null); + } + + public ECBlockGroupStats getECBlockGroupStats() throws IOException { + rpcServer.checkOperation(OperationCategory.READ); + + RemoteMethod method = new RemoteMethod("getECBlockGroupStats"); + Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); + Map<FederationNamespaceInfo, ECBlockGroupStats> allStats = + rpcClient.invokeConcurrent( + nss, method, true, false, ECBlockGroupStats.class); + + // Merge the stats from all the namespaces + long lowRedundancyBlockGroups = 0; + long corruptBlockGroups = 0; + long missingBlockGroups = 0; + long bytesInFutureBlockGroups = 0; + long pendingDeletionBlocks = 0; + for (ECBlockGroupStats stats : allStats.values()) { + lowRedundancyBlockGroups += stats.getLowRedundancyBlockGroups(); + corruptBlockGroups += stats.getCorruptBlockGroups(); + missingBlockGroups += stats.getMissingBlockGroups(); + bytesInFutureBlockGroups += stats.getBytesInFutureBlockGroups(); + pendingDeletionBlocks += stats.getPendingDeletionBlocks(); + } + return new ECBlockGroupStats(lowRedundancyBlockGroups, corruptBlockGroups, + missingBlockGroups, bytesInFutureBlockGroups, pendingDeletionBlocks); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/62a819d5/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java new file mode 100644 index 0000000..f8c7a9b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.lang.reflect.Constructor; +import java.net.URL; +import java.net.URLConnection; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.util.VersionInfo; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utilities for managing HDFS federation. + */ +public final class FederationUtil { + + private static final Logger LOG = + LoggerFactory.getLogger(FederationUtil.class); + + private FederationUtil() { + // Utility Class + } + + /** + * Get a JMX data from a web endpoint. + * + * @param beanQuery JMX bean. + * @param webAddress Web address of the JMX endpoint. + * @return JSON with the JMX data + */ + public static JSONArray getJmx(String beanQuery, String webAddress) { + JSONArray ret = null; + BufferedReader reader = null; + try { + String host = webAddress; + int port = -1; + if (webAddress.indexOf(":") > 0) { + String[] webAddressSplit = webAddress.split(":"); + host = webAddressSplit[0]; + port = Integer.parseInt(webAddressSplit[1]); + } + URL jmxURL = new URL("http", host, port, "/jmx?qry=" + beanQuery); + URLConnection conn = jmxURL.openConnection(); + conn.setConnectTimeout(5 * 1000); + conn.setReadTimeout(5 * 1000); + InputStream in = conn.getInputStream(); + InputStreamReader isr = new InputStreamReader(in, "UTF-8"); + reader = new BufferedReader(isr); + + StringBuilder sb = new StringBuilder(); + String line = null; + while ((line = reader.readLine()) != null) { + sb.append(line); + } + String jmxOutput = sb.toString(); + + // Parse JSON + JSONObject json = new JSONObject(jmxOutput); + ret = json.getJSONArray("beans"); + } catch (IOException e) { + LOG.error("Cannot read JMX bean {} from server {}: {}", + beanQuery, webAddress, e.getMessage()); + } catch (JSONException e) { + LOG.error("Cannot parse JMX output for {} from server {}: {}", + beanQuery, webAddress, e.getMessage()); + } catch (Exception e) { + LOG.error("Cannot parse JMX output for {} from server {}: {}", + beanQuery, webAddress, e); + } finally { + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + LOG.error("Problem closing {}", webAddress, e); + } + } + } + return ret; + } + + /** + * Fetch the Hadoop version string for this jar. + * + * @return Hadoop version string, e.g., 3.0.1. + */ + public static String getVersion() { + return VersionInfo.getVersion(); + } + + /** + * Fetch the build/compile information for this jar. + * + * @return String Compilation info. + */ + public static String getCompileInfo() { + return VersionInfo.getDate() + " by " + VersionInfo.getUser() + " from " + + VersionInfo.getBranch(); + } + + /** + * Create an instance of an interface with a constructor using a context. + * + * @param conf Configuration for the class names. + * @param context Context object to pass to the instance. + * @param contextClass Type of the context passed to the constructor. + * @param clazz Class of the object to return. + * @return New instance of the specified class that implements the desired + * interface and a single parameter constructor containing a + * StateStore reference. + */ + private static <T, R> T newInstance(final Configuration conf, + final R context, final Class<R> contextClass, final Class<T> clazz) { + try { + if (contextClass == null) { + // Default constructor if no context + Constructor<T> constructor = clazz.getConstructor(); + return constructor.newInstance(); + } else { + // Constructor with context + Constructor<T> constructor = clazz.getConstructor( + Configuration.class, contextClass); + return constructor.newInstance(conf, context); + } + } catch (ReflectiveOperationException e) { + LOG.error("Could not instantiate: {}", clazz.getSimpleName(), e); + return null; + } + } + + /** + * Creates an instance of a FileSubclusterResolver from the configuration. + * + * @param conf Configuration that defines the file resolver class. + * @param router Router service. + * @return New file subcluster resolver. + */ + public static FileSubclusterResolver newFileSubclusterResolver( + Configuration conf, Router router) { + Class<? extends FileSubclusterResolver> clazz = conf.getClass( + RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS, + RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS_DEFAULT, + FileSubclusterResolver.class); + return newInstance(conf, router, Router.class, clazz); + } + + /** + * Creates an instance of an ActiveNamenodeResolver from the configuration. + * + * @param conf Configuration that defines the namenode resolver class. + * @param stateStore State store passed to class constructor. + * @return New active namenode resolver. + */ + public static ActiveNamenodeResolver newActiveNamenodeResolver( + Configuration conf, StateStoreService stateStore) { + Class<? extends ActiveNamenodeResolver> clazz = conf.getClass( + RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS, + RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS_DEFAULT, + ActiveNamenodeResolver.class); + return newInstance(conf, stateStore, StateStoreService.class, clazz); + } + + /** + * Check if the given path is the child of parent path. + * @param path Path to be check. + * @param parent Parent path. + * @return True if parent path is parent entry for given path. + */ + public static boolean isParentEntry(final String path, final String parent) { + if (!path.startsWith(parent)) { + return false; + } + + if (path.equals(parent)) { + return true; + } + + return path.charAt(parent.length()) == Path.SEPARATOR_CHAR + || parent.equals(Path.SEPARATOR); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
