HDFS-11538. Move ClientProtocol HA proxies into hadoop-hdfs-client. Contributed by Huafeng Wang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c58c01fe Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c58c01fe Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c58c01fe Branch: refs/heads/HDFS-10467 Commit: c58c01fe043c0cf7182eccef1b7205adfa091201 Parents: 11406a4 Author: Andrew Wang <[email protected]> Authored: Tue Apr 4 23:05:24 2017 -0700 Committer: Inigo <[email protected]> Committed: Thu Apr 6 18:58:22 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSUtilClient.java | 13 + .../org/apache/hadoop/hdfs/HAUtilClient.java | 55 +++ .../hadoop/hdfs/NameNodeProxiesClient.java | 15 +- .../hdfs/client/HdfsClientConfigKeys.java | 1 + .../namenode/ha/ClientHAProxyFactory.java | 44 ++ .../ha/ConfiguredFailoverProxyProvider.java | 183 +++++++ .../hdfs/server/namenode/ha/HAProxyFactory.java | 44 ++ .../namenode/ha/IPFailoverProxyProvider.java | 126 +++++ .../ha/RequestHedgingProxyProvider.java | 234 +++++++++ .../ha/TestRequestHedgingProxyProvider.java | 476 +++++++++++++++++++ .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 3 +- .../java/org/apache/hadoop/hdfs/DFSUtil.java | 15 +- .../java/org/apache/hadoop/hdfs/HAUtil.java | 57 +-- .../org/apache/hadoop/hdfs/NameNodeProxies.java | 3 +- .../hadoop/hdfs/server/namenode/DfsServlet.java | 29 +- .../ha/ConfiguredFailoverProxyProvider.java | 216 --------- .../namenode/ha/IPFailoverProxyProvider.java | 132 ----- .../namenode/ha/NameNodeHAProxyFactory.java | 45 ++ .../ha/RequestHedgingProxyProvider.java | 241 ---------- .../hadoop/hdfs/TestDFSClientFailover.java | 4 +- .../org/apache/hadoop/hdfs/TestDFSUtil.java | 2 +- .../namenode/ha/TestDelegationTokensWithHA.java | 4 +- .../ha/TestRequestHedgingProxyProvider.java | 470 ------------------ 23 files changed, 1247 insertions(+), 1165 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java index f9b2e8d..2e770cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java @@ -170,6 +170,19 @@ public class DFSUtilClient { } /** + * Returns list of InetSocketAddress corresponding to HA NN RPC addresses from + * the configuration. + * + * @param conf configuration + * @return list of InetSocketAddresses + */ + public static Map<String, Map<String, InetSocketAddress>> getHaNnRpcAddresses( + Configuration conf) { + return DFSUtilClient.getAddresses(conf, null, + HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY); + } + + /** * Returns list of InetSocketAddress corresponding to HA NN HTTP addresses from * the configuration. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/HAUtilClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/HAUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/HAUtilClient.java index 9f28cfc..47288f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/HAUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/HAUtilClient.java @@ -20,15 +20,29 @@ package org.apache.hadoop.hdfs; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.net.InetSocketAddress; import java.net.URI; +import java.util.Collection; import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HA_DT_SERVICE_PREFIX; +import static org.apache.hadoop.security.SecurityUtil.buildTokenService; @InterfaceAudience.Private public class HAUtilClient { + private static final Logger LOG = LoggerFactory.getLogger(HAUtilClient.class); + + private static final DelegationTokenSelector tokenSelector = + new DelegationTokenSelector(); + /** * @return true if the given nameNodeUri appears to be a logical URI. */ @@ -92,4 +106,45 @@ public class HAUtilClient { public static boolean isTokenForLogicalUri(Token<?> token) { return token.getService().toString().startsWith(HA_DT_SERVICE_PREFIX); } + + /** + * Locate a delegation token associated with the given HA cluster URI, and if + * one is found, clone it to also represent the underlying namenode address. + * @param ugi the UGI to modify + * @param haUri the logical URI for the cluster + * @param nnAddrs collection of NNs in the cluster to which the token + * applies + */ + public static void cloneDelegationTokenForLogicalUri( + UserGroupInformation ugi, URI haUri, + Collection<InetSocketAddress> nnAddrs) { + // this cloning logic is only used by hdfs + Text haService = HAUtilClient.buildTokenServiceForLogicalUri(haUri, + HdfsConstants.HDFS_URI_SCHEME); + Token<DelegationTokenIdentifier> haToken = + tokenSelector.selectToken(haService, ugi.getTokens()); + if (haToken != null) { + for (InetSocketAddress singleNNAddr : nnAddrs) { + // this is a minor hack to prevent physical HA tokens from being + // exposed to the user via UGI.getCredentials(), otherwise these + // cloned tokens may be inadvertently propagated to jobs + Token<DelegationTokenIdentifier> specificToken = + haToken.privateClone(buildTokenService(singleNNAddr)); + Text alias = new Text( + HAUtilClient.buildTokenServicePrefixForLogicalUri( + HdfsConstants.HDFS_URI_SCHEME) + + "//" + specificToken.getService()); + ugi.addToken(alias, specificToken); + if (LOG.isDebugEnabled()) { + LOG.debug("Mapped HA service delegation token for logical URI " + + haUri + " to namenode " + singleNNAddr); + } + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("No HA service delegation token found for logical URI " + + haUri); + } + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java index 5ca7030..a092f02 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java @@ -28,6 +28,8 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.server.namenode.ha.ClientHAProxyFactory; +import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -212,6 +214,14 @@ public class NameNodeProxiesClient { public static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider( Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort, AtomicBoolean fallbackToSimpleAuth) throws IOException { + return createFailoverProxyProvider(conf, nameNodeUri, xface, checkPort, + fallbackToSimpleAuth, new ClientHAProxyFactory<T>()); + } + + protected static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider( + Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort, + AtomicBoolean fallbackToSimpleAuth, HAProxyFactory<T> proxyFactory) + throws IOException { Class<FailoverProxyProvider<T>> failoverProxyProviderClass = null; AbstractNNFailoverProxyProvider<T> providerNN; try { @@ -223,9 +233,10 @@ public class NameNodeProxiesClient { } // Create a proxy provider instance. Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass - .getConstructor(Configuration.class, URI.class, Class.class); + .getConstructor(Configuration.class, URI.class, + Class.class, HAProxyFactory.class); FailoverProxyProvider<T> provider = ctor.newInstance(conf, nameNodeUri, - xface); + xface, proxyFactory); // If the proxy provider is of an old implementation, wrap it. if (!(provider instanceof AbstractNNFailoverProxyProvider)) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 1a38806..c152a4b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -67,6 +67,7 @@ public interface HdfsClientConfigKeys { String PREFIX = "dfs.client."; String DFS_NAMESERVICES = "dfs.nameservices"; + String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address"; int DFS_NAMENODE_HTTP_PORT_DEFAULT = 9870; String DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address"; int DFS_NAMENODE_HTTPS_PORT_DEFAULT = 9871; http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ClientHAProxyFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ClientHAProxyFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ClientHAProxyFactory.java new file mode 100644 index 0000000..b887d87 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ClientHAProxyFactory.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.NameNodeProxiesClient; +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; + +public class ClientHAProxyFactory<T> implements HAProxyFactory<T> { + @Override + @SuppressWarnings("unchecked") + public T createProxy(Configuration conf, InetSocketAddress nnAddr, + Class<T> xface, UserGroupInformation ugi, boolean withRetries, + AtomicBoolean fallbackToSimpleAuth) throws IOException { + return (T) NameNodeProxiesClient.createNonHAProxyWithClientProtocol( + nnAddr, conf, ugi, false, fallbackToSimpleAuth); + } + + @Override + public T createProxy(Configuration conf, InetSocketAddress nnAddr, + Class<T> xface, UserGroupInformation ugi, boolean withRetries) + throws IOException { + return createProxy(conf, nnAddr, xface, ugi, withRetries, null); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java new file mode 100644 index 0000000..e9c8791 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.HAUtilClient; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A FailoverProxyProvider implementation which allows one to configure + * multiple URIs to connect to during fail-over. A random configured address is + * tried first, and on a fail-over event the other addresses are tried + * sequentially in a random order. + */ +public class ConfiguredFailoverProxyProvider<T> extends + AbstractNNFailoverProxyProvider<T> { + + private static final Logger LOG = + LoggerFactory.getLogger(ConfiguredFailoverProxyProvider.class); + + protected final Configuration conf; + protected final List<AddressRpcProxyPair<T>> proxies = + new ArrayList<AddressRpcProxyPair<T>>(); + private final UserGroupInformation ugi; + protected final Class<T> xface; + + private int currentProxyIndex = 0; + private final HAProxyFactory<T> factory; + + public ConfiguredFailoverProxyProvider(Configuration conf, URI uri, + Class<T> xface, HAProxyFactory<T> factory) { + this.xface = xface; + this.conf = new Configuration(conf); + int maxRetries = this.conf.getInt( + HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_KEY, + HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_DEFAULT); + this.conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, + maxRetries); + + int maxRetriesOnSocketTimeouts = this.conf.getInt( + HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY, + HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT); + this.conf.setInt( + CommonConfigurationKeysPublic + .IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, + maxRetriesOnSocketTimeouts); + + try { + ugi = UserGroupInformation.getCurrentUser(); + + Map<String, Map<String, InetSocketAddress>> map = + DFSUtilClient.getHaNnRpcAddresses(conf); + Map<String, InetSocketAddress> addressesInNN = map.get(uri.getHost()); + + if (addressesInNN == null || addressesInNN.size() == 0) { + throw new RuntimeException("Could not find any configured addresses " + + "for URI " + uri); + } + + Collection<InetSocketAddress> addressesOfNns = addressesInNN.values(); + for (InetSocketAddress address : addressesOfNns) { + proxies.add(new AddressRpcProxyPair<T>(address)); + } + // Randomize the list to prevent all clients pointing to the same one + boolean randomized = conf.getBoolean( + HdfsClientConfigKeys.Failover.RANDOM_ORDER, + HdfsClientConfigKeys.Failover.RANDOM_ORDER_DEFAULT); + if (randomized) { + Collections.shuffle(proxies); + } + + // The client may have a delegation token set for the logical + // URI of the cluster. Clone this token to apply to each of the + // underlying IPC addresses so that the IPC code can find it. + HAUtilClient.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns); + this.factory = factory; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Class<T> getInterface() { + return xface; + } + + /** + * Lazily initialize the RPC proxy object. + */ + @Override + public synchronized ProxyInfo<T> getProxy() { + AddressRpcProxyPair<T> current = proxies.get(currentProxyIndex); + if (current.namenode == null) { + try { + current.namenode = factory.createProxy(conf, + current.address, xface, ugi, false, getFallbackToSimpleAuth()); + } catch (IOException e) { + LOG.error("Failed to create RPC proxy to NameNode", e); + throw new RuntimeException(e); + } + } + return new ProxyInfo<T>(current.namenode, current.address.toString()); + } + + @Override + public void performFailover(T currentProxy) { + incrementProxyIndex(); + } + + synchronized void incrementProxyIndex() { + currentProxyIndex = (currentProxyIndex + 1) % proxies.size(); + } + + /** + * A little pair object to store the address and connected RPC proxy object to + * an NN. Note that {@link AddressRpcProxyPair#namenode} may be null. + */ + private static class AddressRpcProxyPair<T> { + public final InetSocketAddress address; + public T namenode; + + public AddressRpcProxyPair(InetSocketAddress address) { + this.address = address; + } + } + + /** + * Close all the proxy objects which have been opened over the lifetime of + * this proxy provider. + */ + @Override + public synchronized void close() throws IOException { + for (AddressRpcProxyPair<T> proxy : proxies) { + if (proxy.namenode != null) { + if (proxy.namenode instanceof Closeable) { + ((Closeable)proxy.namenode).close(); + } else { + RPC.stopProxy(proxy.namenode); + } + } + } + } + + /** + * Logical URI is required for this failover proxy provider. + */ + @Override + public boolean useLogicalURI() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAProxyFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAProxyFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAProxyFactory.java new file mode 100644 index 0000000..f92a74f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAProxyFactory.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This interface aims to decouple the proxy creation implementation that used + * in {@link AbstractNNFailoverProxyProvider}. Client side can use + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol} to initialize the + * proxy while the server side can use NamenodeProtocols + */ [email protected] +public interface HAProxyFactory<T> { + + T createProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface, + UserGroupInformation ugi, boolean withRetries, + AtomicBoolean fallbackToSimpleAuth) throws IOException; + + T createProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface, + UserGroupInformation ugi, boolean withRetries) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java new file mode 100644 index 0000000..ed250a0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * A NNFailoverProxyProvider implementation which works on IP failover setup. + * Only one proxy is used to connect to both servers and switching between + * the servers is done by the environment/infrastructure, which guarantees + * clients can consistently reach only one node at a time. + * + * Clients with a live connection will likely get connection reset after an + * IP failover. This case will be handled by the + * FailoverOnNetworkExceptionRetry retry policy. I.e. if the call is + * not idempotent, it won't get retried. + * + * A connection reset while setting up a connection (i.e. before sending a + * request) will be handled in ipc client. + * + * The namenode URI must contain a resolvable host name. + */ +public class IPFailoverProxyProvider<T> extends + AbstractNNFailoverProxyProvider<T> { + private final Configuration conf; + private final Class<T> xface; + private final URI nameNodeUri; + private final HAProxyFactory<T> factory; + private ProxyInfo<T> nnProxyInfo = null; + + public IPFailoverProxyProvider(Configuration conf, URI uri, + Class<T> xface, HAProxyFactory<T> factory) { + this.xface = xface; + this.nameNodeUri = uri; + this.factory = factory; + + this.conf = new Configuration(conf); + int maxRetries = this.conf.getInt( + HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_KEY, + HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_DEFAULT); + this.conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, + maxRetries); + + int maxRetriesOnSocketTimeouts = this.conf.getInt( + HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY, + HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT); + this.conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, + maxRetriesOnSocketTimeouts); + } + + @Override + public Class<T> getInterface() { + return xface; + } + + @Override + public synchronized ProxyInfo<T> getProxy() { + // Create a non-ha proxy if not already created. + if (nnProxyInfo == null) { + try { + // Create a proxy that is not wrapped in RetryProxy + InetSocketAddress nnAddr = DFSUtilClient.getNNAddress(nameNodeUri); + nnProxyInfo = new ProxyInfo<T>(factory.createProxy(conf, nnAddr, xface, + UserGroupInformation.getCurrentUser(), false), nnAddr.toString()); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + return nnProxyInfo; + } + + /** Nothing to do for IP failover */ + @Override + public void performFailover(T currentProxy) { + } + + /** + * Close the proxy, + */ + @Override + public synchronized void close() throws IOException { + if (nnProxyInfo == null) { + return; + } + if (nnProxyInfo.proxy instanceof Closeable) { + ((Closeable)nnProxyInfo.proxy).close(); + } else { + RPC.stopProxy(nnProxyInfo.proxy); + } + } + + /** + * Logical URI is not used for IP failover. + */ + @Override + public boolean useLogicalURI() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java new file mode 100644 index 0000000..b94e94d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.StandbyException; + +import org.apache.hadoop.io.retry.MultiException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A FailoverProxyProvider implementation that technically does not "failover" + * per-se. It constructs a wrapper proxy that sends the request to ALL + * underlying proxies simultaneously. It assumes the in an HA setup, there will + * be only one Active, and the active should respond faster than any configured + * standbys. Once it receive a response from any one of the configred proxies, + * outstanding requests to other proxies are immediately cancelled. + */ +public class RequestHedgingProxyProvider<T> extends + ConfiguredFailoverProxyProvider<T> { + + public static final Logger LOG = + LoggerFactory.getLogger(RequestHedgingProxyProvider.class); + + class RequestHedgingInvocationHandler implements InvocationHandler { + + final Map<String, ProxyInfo<T>> targetProxies; + + public RequestHedgingInvocationHandler( + Map<String, ProxyInfo<T>> targetProxies) { + this.targetProxies = new HashMap<>(targetProxies); + } + + /** + * Creates a Executor and invokes all proxies concurrently. This + * implementation assumes that Clients have configured proper socket + * timeouts, else the call can block forever. + * + * @param proxy + * @param method + * @param args + * @return + * @throws Throwable + */ + @Override + public Object + invoke(Object proxy, final Method method, final Object[] args) + throws Throwable { + Map<Future<Object>, ProxyInfo<T>> proxyMap = new HashMap<>(); + int numAttempts = 0; + + ExecutorService executor = null; + CompletionService<Object> completionService; + try { + // Optimization : if only 2 proxies are configured and one had failed + // over, then we dont need to create a threadpool etc. + targetProxies.remove(toIgnore); + if (targetProxies.size() == 1) { + ProxyInfo<T> proxyInfo = targetProxies.values().iterator().next(); + Object retVal = method.invoke(proxyInfo.proxy, args); + successfulProxy = proxyInfo; + return retVal; + } + executor = Executors.newFixedThreadPool(proxies.size()); + completionService = new ExecutorCompletionService<>(executor); + for (final Map.Entry<String, ProxyInfo<T>> pEntry : + targetProxies.entrySet()) { + Callable<Object> c = new Callable<Object>() { + @Override + public Object call() throws Exception { + LOG.trace("Invoking method {} on proxy {}", method, + pEntry.getValue().proxyInfo); + return method.invoke(pEntry.getValue().proxy, args); + } + }; + proxyMap.put(completionService.submit(c), pEntry.getValue()); + numAttempts++; + } + + Map<String, Exception> badResults = new HashMap<>(); + while (numAttempts > 0) { + Future<Object> callResultFuture = completionService.take(); + Object retVal; + try { + retVal = callResultFuture.get(); + successfulProxy = proxyMap.get(callResultFuture); + LOG.debug("Invocation successful on [{}]", + successfulProxy.proxyInfo); + return retVal; + } catch (Exception ex) { + ProxyInfo<T> tProxyInfo = proxyMap.get(callResultFuture); + logProxyException(ex, tProxyInfo.proxyInfo); + badResults.put(tProxyInfo.proxyInfo, unwrapException(ex)); + LOG.trace("Unsuccessful invocation on [{}]", tProxyInfo.proxyInfo); + numAttempts--; + } + } + + // At this point we should have All bad results (Exceptions) + // Or should have returned with successful result. + if (badResults.size() == 1) { + throw badResults.values().iterator().next(); + } else { + throw new MultiException(badResults); + } + } finally { + if (executor != null) { + LOG.trace("Shutting down threadpool executor"); + executor.shutdownNow(); + } + } + } + } + + + private volatile ProxyInfo<T> successfulProxy = null; + private volatile String toIgnore = null; + + public RequestHedgingProxyProvider(Configuration conf, URI uri, + Class<T> xface, HAProxyFactory<T> proxyFactory) { + super(conf, uri, xface, proxyFactory); + } + + @SuppressWarnings("unchecked") + @Override + public synchronized ProxyInfo<T> getProxy() { + if (successfulProxy != null) { + return successfulProxy; + } + Map<String, ProxyInfo<T>> targetProxyInfos = new HashMap<>(); + StringBuilder combinedInfo = new StringBuilder("["); + for (int i = 0; i < proxies.size(); i++) { + ProxyInfo<T> pInfo = super.getProxy(); + incrementProxyIndex(); + targetProxyInfos.put(pInfo.proxyInfo, pInfo); + combinedInfo.append(pInfo.proxyInfo).append(','); + } + combinedInfo.append(']'); + T wrappedProxy = (T) Proxy.newProxyInstance( + RequestHedgingInvocationHandler.class.getClassLoader(), + new Class<?>[]{xface}, + new RequestHedgingInvocationHandler(targetProxyInfos)); + return new ProxyInfo<T>(wrappedProxy, combinedInfo.toString()); + } + + @Override + public synchronized void performFailover(T currentProxy) { + toIgnore = successfulProxy.proxyInfo; + successfulProxy = null; + } + + /** + * Check the exception returned by the proxy log a warning message if it's + * not a StandbyException (expected exception). + * @param ex Exception to evaluate. + * @param proxyInfo Information of the proxy reporting the exception. + */ + private void logProxyException(Exception ex, String proxyInfo) { + if (isStandbyException(ex)) { + LOG.debug("Invocation returned standby exception on [{}]", proxyInfo); + } else { + LOG.warn("Invocation returned exception on [{}]", proxyInfo); + } + } + + /** + * Check if the returned exception is caused by an standby namenode. + * @param ex Exception to check. + * @return If the exception is caused by an standby namenode. + */ + private boolean isStandbyException(Exception ex) { + Exception exception = unwrapException(ex); + if (exception instanceof RemoteException) { + return ((RemoteException) exception).unwrapRemoteException() + instanceof StandbyException; + } + return false; + } + + /** + * Unwraps the exception. <p> + * Example: + * <blockquote><pre> + * if ex is + * ExecutionException(InvocationTargetExeption(SomeException)) + * returns SomeException + * </pre></blockquote> + * + * @return unwrapped exception + */ + private Exception unwrapException(Exception ex) { + if (ex != null) { + Throwable cause = ex.getCause(); + if (cause instanceof Exception) { + Throwable innerCause = cause.getCause(); + if (innerCause instanceof Exception) { + return (Exception) innerCause; + } + return (Exception) cause; + } + } + return ex; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java new file mode 100644 index 0000000..724b5f0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java @@ -0,0 +1,476 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import java.io.EOFException; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.io.retry.MultiException; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; +import org.apache.log4j.Level; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import com.google.common.collect.Lists; + +public class TestRequestHedgingProxyProvider { + + private Configuration conf; + private URI nnUri; + private String ns; + + @BeforeClass + public static void setupClass() throws Exception { + GenericTestUtils.setLogLevel(RequestHedgingProxyProvider.LOG, Level.TRACE); + } + + @Before + public void setup() throws URISyntaxException { + ns = "mycluster-" + Time.monotonicNow(); + nnUri = new URI("hdfs://" + ns); + conf = new Configuration(); + conf.set(HdfsClientConfigKeys.DFS_NAMESERVICES, ns); + conf.set( + HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, "nn1,nn2"); + conf.set( + HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn1", + "machine1.foo.bar:9820"); + conf.set( + HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn2", + "machine2.foo.bar:9820"); + } + + @Test + public void testHedgingWhenOneFails() throws Exception { + final ClientProtocol goodMock = Mockito.mock(ClientProtocol.class); + Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() { + @Override + public long[] answer(InvocationOnMock invocation) throws Throwable { + Thread.sleep(1000); + return new long[]{1}; + } + }); + final ClientProtocol badMock = Mockito.mock(ClientProtocol.class); + Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!")); + + RequestHedgingProxyProvider<ClientProtocol> provider = + new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class, + createFactory(badMock, goodMock)); + long[] stats = provider.getProxy().proxy.getStats(); + Assert.assertTrue(stats.length == 1); + Mockito.verify(badMock).getStats(); + Mockito.verify(goodMock).getStats(); + } + + @Test + public void testHedgingWhenOneIsSlow() throws Exception { + final ClientProtocol goodMock = Mockito.mock(ClientProtocol.class); + Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() { + @Override + public long[] answer(InvocationOnMock invocation) throws Throwable { + Thread.sleep(1000); + return new long[]{1}; + } + }); + final ClientProtocol badMock = Mockito.mock(ClientProtocol.class); + Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!")); + + RequestHedgingProxyProvider<ClientProtocol> provider = + new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class, + createFactory(goodMock, badMock)); + long[] stats = provider.getProxy().proxy.getStats(); + Assert.assertTrue(stats.length == 1); + Assert.assertEquals(1, stats[0]); + Mockito.verify(badMock).getStats(); + Mockito.verify(goodMock).getStats(); + } + + @Test + public void testHedgingWhenBothFail() throws Exception { + ClientProtocol badMock = Mockito.mock(ClientProtocol.class); + Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!")); + ClientProtocol worseMock = Mockito.mock(ClientProtocol.class); + Mockito.when(worseMock.getStats()).thenThrow( + new IOException("Worse mock !!")); + + RequestHedgingProxyProvider<ClientProtocol> provider = + new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class, + createFactory(badMock, worseMock)); + try { + provider.getProxy().proxy.getStats(); + Assert.fail("Should fail since both namenodes throw IOException !!"); + } catch (Exception e) { + Assert.assertTrue(e instanceof MultiException); + } + Mockito.verify(badMock).getStats(); + Mockito.verify(worseMock).getStats(); + } + + @Test + public void testPerformFailover() throws Exception { + final AtomicInteger counter = new AtomicInteger(0); + final int[] isGood = {1}; + final ClientProtocol goodMock = Mockito.mock(ClientProtocol.class); + Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() { + @Override + public long[] answer(InvocationOnMock invocation) throws Throwable { + counter.incrementAndGet(); + if (isGood[0] == 1) { + Thread.sleep(1000); + return new long[]{1}; + } + throw new IOException("Was Good mock !!"); + } + }); + final ClientProtocol badMock = Mockito.mock(ClientProtocol.class); + Mockito.when(badMock.getStats()).thenAnswer(new Answer<long[]>() { + @Override + public long[] answer(InvocationOnMock invocation) throws Throwable { + counter.incrementAndGet(); + if (isGood[0] == 2) { + Thread.sleep(1000); + return new long[]{2}; + } + throw new IOException("Bad mock !!"); + } + }); + RequestHedgingProxyProvider<ClientProtocol> provider = + new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class, + createFactory(goodMock, badMock)); + long[] stats = provider.getProxy().proxy.getStats(); + Assert.assertTrue(stats.length == 1); + Assert.assertEquals(1, stats[0]); + Assert.assertEquals(2, counter.get()); + Mockito.verify(badMock).getStats(); + Mockito.verify(goodMock).getStats(); + + stats = provider.getProxy().proxy.getStats(); + Assert.assertTrue(stats.length == 1); + Assert.assertEquals(1, stats[0]); + // Ensure only the previous successful one is invoked + Mockito.verifyNoMoreInteractions(badMock); + Assert.assertEquals(3, counter.get()); + + // Flip to standby.. so now this should fail + isGood[0] = 2; + try { + provider.getProxy().proxy.getStats(); + Assert.fail("Should fail since previously successful proxy now fails "); + } catch (Exception ex) { + Assert.assertTrue(ex instanceof IOException); + } + + Assert.assertEquals(4, counter.get()); + + provider.performFailover(provider.getProxy().proxy); + stats = provider.getProxy().proxy.getStats(); + Assert.assertTrue(stats.length == 1); + Assert.assertEquals(2, stats[0]); + + // Counter should update only once + Assert.assertEquals(5, counter.get()); + + stats = provider.getProxy().proxy.getStats(); + Assert.assertTrue(stats.length == 1); + Assert.assertEquals(2, stats[0]); + + // Counter updates only once now + Assert.assertEquals(6, counter.get()); + + // Flip back to old active.. so now this should fail + isGood[0] = 1; + try { + provider.getProxy().proxy.getStats(); + Assert.fail("Should fail since previously successful proxy now fails "); + } catch (Exception ex) { + Assert.assertTrue(ex instanceof IOException); + } + + Assert.assertEquals(7, counter.get()); + + provider.performFailover(provider.getProxy().proxy); + stats = provider.getProxy().proxy.getStats(); + Assert.assertTrue(stats.length == 1); + // Ensure correct proxy was called + Assert.assertEquals(1, stats[0]); + } + + @Test + public void testPerformFailoverWith3Proxies() throws Exception { + conf.set(HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, + "nn1,nn2,nn3"); + conf.set(HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn3", + "machine3.foo.bar:9820"); + + final AtomicInteger counter = new AtomicInteger(0); + final int[] isGood = {1}; + final ClientProtocol goodMock = Mockito.mock(ClientProtocol.class); + Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() { + @Override + public long[] answer(InvocationOnMock invocation) throws Throwable { + counter.incrementAndGet(); + if (isGood[0] == 1) { + Thread.sleep(1000); + return new long[]{1}; + } + throw new IOException("Was Good mock !!"); + } + }); + final ClientProtocol badMock = Mockito.mock(ClientProtocol.class); + Mockito.when(badMock.getStats()).thenAnswer(new Answer<long[]>() { + @Override + public long[] answer(InvocationOnMock invocation) throws Throwable { + counter.incrementAndGet(); + if (isGood[0] == 2) { + Thread.sleep(1000); + return new long[]{2}; + } + throw new IOException("Bad mock !!"); + } + }); + final ClientProtocol worseMock = Mockito.mock(ClientProtocol.class); + Mockito.when(worseMock.getStats()).thenAnswer(new Answer<long[]>() { + @Override + public long[] answer(InvocationOnMock invocation) throws Throwable { + counter.incrementAndGet(); + if (isGood[0] == 3) { + Thread.sleep(1000); + return new long[]{3}; + } + throw new IOException("Worse mock !!"); + } + }); + + RequestHedgingProxyProvider<ClientProtocol> provider = + new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class, + createFactory(goodMock, badMock, worseMock)); + long[] stats = provider.getProxy().proxy.getStats(); + Assert.assertTrue(stats.length == 1); + Assert.assertEquals(1, stats[0]); + Assert.assertEquals(3, counter.get()); + Mockito.verify(badMock).getStats(); + Mockito.verify(goodMock).getStats(); + Mockito.verify(worseMock).getStats(); + + stats = provider.getProxy().proxy.getStats(); + Assert.assertTrue(stats.length == 1); + Assert.assertEquals(1, stats[0]); + // Ensure only the previous successful one is invoked + Mockito.verifyNoMoreInteractions(badMock); + Mockito.verifyNoMoreInteractions(worseMock); + Assert.assertEquals(4, counter.get()); + + // Flip to standby.. so now this should fail + isGood[0] = 2; + try { + provider.getProxy().proxy.getStats(); + Assert.fail("Should fail since previously successful proxy now fails "); + } catch (Exception ex) { + Assert.assertTrue(ex instanceof IOException); + } + + Assert.assertEquals(5, counter.get()); + + provider.performFailover(provider.getProxy().proxy); + stats = provider.getProxy().proxy.getStats(); + Assert.assertTrue(stats.length == 1); + Assert.assertEquals(2, stats[0]); + + // Counter updates twice since both proxies are tried on failure + Assert.assertEquals(7, counter.get()); + + stats = provider.getProxy().proxy.getStats(); + Assert.assertTrue(stats.length == 1); + Assert.assertEquals(2, stats[0]); + + // Counter updates only once now + Assert.assertEquals(8, counter.get()); + + // Flip to Other standby.. so now this should fail + isGood[0] = 3; + try { + provider.getProxy().proxy.getStats(); + Assert.fail("Should fail since previously successful proxy now fails "); + } catch (Exception ex) { + Assert.assertTrue(ex instanceof IOException); + } + + // Counter should ipdate only 1 time + Assert.assertEquals(9, counter.get()); + + provider.performFailover(provider.getProxy().proxy); + stats = provider.getProxy().proxy.getStats(); + Assert.assertTrue(stats.length == 1); + + // Ensure correct proxy was called + Assert.assertEquals(3, stats[0]); + + // Counter updates twice since both proxies are tried on failure + Assert.assertEquals(11, counter.get()); + + stats = provider.getProxy().proxy.getStats(); + Assert.assertTrue(stats.length == 1); + Assert.assertEquals(3, stats[0]); + + // Counter updates only once now + Assert.assertEquals(12, counter.get()); + } + + @Test + public void testHedgingWhenFileNotFoundException() throws Exception { + ClientProtocol active = Mockito.mock(ClientProtocol.class); + Mockito + .when(active.getBlockLocations(Matchers.anyString(), + Matchers.anyLong(), Matchers.anyLong())) + .thenThrow(new RemoteException("java.io.FileNotFoundException", + "File does not exist!")); + + ClientProtocol standby = Mockito.mock(ClientProtocol.class); + Mockito + .when(standby.getBlockLocations(Matchers.anyString(), + Matchers.anyLong(), Matchers.anyLong())) + .thenThrow( + new RemoteException("org.apache.hadoop.ipc.StandbyException", + "Standby NameNode")); + + RequestHedgingProxyProvider<ClientProtocol> provider = + new RequestHedgingProxyProvider<>(conf, nnUri, + ClientProtocol.class, createFactory(active, standby)); + try { + provider.getProxy().proxy.getBlockLocations("/tmp/test.file", 0L, 20L); + Assert.fail("Should fail since the active namenode throws" + + " FileNotFoundException!"); + } catch (MultiException me) { + for (Exception ex : me.getExceptions().values()) { + Exception rEx = ((RemoteException) ex).unwrapRemoteException(); + if (rEx instanceof StandbyException) { + continue; + } + Assert.assertTrue(rEx instanceof FileNotFoundException); + } + } + Mockito.verify(active).getBlockLocations(Matchers.anyString(), + Matchers.anyLong(), Matchers.anyLong()); + Mockito.verify(standby).getBlockLocations(Matchers.anyString(), + Matchers.anyLong(), Matchers.anyLong()); + } + + @Test + public void testHedgingWhenConnectException() throws Exception { + ClientProtocol active = Mockito.mock(ClientProtocol.class); + Mockito.when(active.getStats()).thenThrow(new ConnectException()); + + ClientProtocol standby = Mockito.mock(ClientProtocol.class); + Mockito.when(standby.getStats()) + .thenThrow( + new RemoteException("org.apache.hadoop.ipc.StandbyException", + "Standby NameNode")); + + RequestHedgingProxyProvider<ClientProtocol> provider = + new RequestHedgingProxyProvider<>(conf, nnUri, + ClientProtocol.class, createFactory(active, standby)); + try { + provider.getProxy().proxy.getStats(); + Assert.fail("Should fail since the active namenode throws" + + " ConnectException!"); + } catch (MultiException me) { + for (Exception ex : me.getExceptions().values()) { + if (ex instanceof RemoteException) { + Exception rEx = ((RemoteException) ex) + .unwrapRemoteException(); + Assert.assertTrue("Unexpected RemoteException: " + rEx.getMessage(), + rEx instanceof StandbyException); + } else { + Assert.assertTrue(ex instanceof ConnectException); + } + } + } + Mockito.verify(active).getStats(); + Mockito.verify(standby).getStats(); + } + + @Test + public void testHedgingWhenConnectAndEOFException() throws Exception { + ClientProtocol active = Mockito.mock(ClientProtocol.class); + Mockito.when(active.getStats()).thenThrow(new EOFException()); + + ClientProtocol standby = Mockito.mock(ClientProtocol.class); + Mockito.when(standby.getStats()).thenThrow(new ConnectException()); + + RequestHedgingProxyProvider<ClientProtocol> provider = + new RequestHedgingProxyProvider<>(conf, nnUri, + ClientProtocol.class, createFactory(active, standby)); + try { + provider.getProxy().proxy.getStats(); + Assert.fail("Should fail since both active and standby namenodes throw" + + " Exceptions!"); + } catch (MultiException me) { + for (Exception ex : me.getExceptions().values()) { + if (!(ex instanceof ConnectException) && + !(ex instanceof EOFException)) { + Assert.fail("Unexpected Exception " + ex.getMessage()); + } + } + } + Mockito.verify(active).getStats(); + Mockito.verify(standby).getStats(); + } + + private HAProxyFactory<ClientProtocol> createFactory( + ClientProtocol... protos) { + final Iterator<ClientProtocol> iterator = + Lists.newArrayList(protos).iterator(); + return new HAProxyFactory<ClientProtocol>() { + @Override + public ClientProtocol createProxy(Configuration conf, + InetSocketAddress nnAddr, Class<ClientProtocol> xface, + UserGroupInformation ugi, boolean withRetries, + AtomicBoolean fallbackToSimpleAuth) throws IOException { + return iterator.next(); + } + + @Override + public ClientProtocol createProxy(Configuration conf, + InetSocketAddress nnAddr, Class<ClientProtocol> xface, + UserGroupInformation ugi, boolean withRetries) throws IOException { + return iterator.next(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 8fea41f..58a2823 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -142,7 +142,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { HdfsClientConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY; public static final String DFS_NAMENODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTP_PORT_DEFAULT; public static final String DFS_NAMENODE_HTTP_BIND_HOST_KEY = "dfs.namenode.http-bind-host"; - public static final String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address"; + public static final String DFS_NAMENODE_RPC_ADDRESS_KEY = + HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; public static final String DFS_NAMENODE_RPC_BIND_HOST_KEY = "dfs.namenode.rpc-bind-host"; public static final String DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.servicerpc-address"; public static final String DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY = "dfs.namenode.servicerpc-bind-host"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index 23166e2..47e1c0d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -450,19 +450,6 @@ public class DFSUtil { } /** - * Returns list of InetSocketAddress corresponding to HA NN RPC addresses from - * the configuration. - * - * @param conf configuration - * @return list of InetSocketAddresses - */ - public static Map<String, Map<String, InetSocketAddress>> getHaNnRpcAddresses( - Configuration conf) { - return DFSUtilClient.getAddresses(conf, null, - DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY); - } - - /** * Returns list of InetSocketAddress corresponding to backup node rpc * addresses from the configuration. * @@ -693,7 +680,7 @@ public class DFSUtil { public static String nnAddressesAsString(Configuration conf) { Map<String, Map<String, InetSocketAddress>> addresses = - getHaNnRpcAddresses(conf); + DFSUtilClient.getHaNnRpcAddresses(conf); return addressMapToString(addresses); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java index ea535e9..3556086 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java @@ -29,7 +29,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KE import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY; -import static org.apache.hadoop.security.SecurityUtil.buildTokenService; import java.io.IOException; import java.net.InetSocketAddress; @@ -39,8 +38,6 @@ import java.util.Collection; import java.util.List; import java.util.Map; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -48,17 +45,12 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; -import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider; -import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -67,12 +59,6 @@ import com.google.common.collect.Lists; @InterfaceAudience.Private public class HAUtil { - private static final Log LOG = - LogFactory.getLog(HAUtil.class); - - private static final DelegationTokenSelector tokenSelector = - new DelegationTokenSelector(); - private static final String[] HA_SPECIAL_INDEPENDENT_KEYS = new String[]{ DFS_NAMENODE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_BIND_HOST_KEY, @@ -97,7 +83,7 @@ public class HAUtil { */ public static boolean isHAEnabled(Configuration conf, String nsId) { Map<String, Map<String, InetSocketAddress>> addresses = - DFSUtil.getHaNnRpcAddresses(conf); + DFSUtilClient.getHaNnRpcAddresses(conf); if (addresses == null) return false; Map<String, InetSocketAddress> nnMap = addresses.get(nsId); return nnMap != null && nnMap.size() > 1; @@ -260,47 +246,6 @@ public class HAUtil { } /** - * Locate a delegation token associated with the given HA cluster URI, and if - * one is found, clone it to also represent the underlying namenode address. - * @param ugi the UGI to modify - * @param haUri the logical URI for the cluster - * @param nnAddrs collection of NNs in the cluster to which the token - * applies - */ - public static void cloneDelegationTokenForLogicalUri( - UserGroupInformation ugi, URI haUri, - Collection<InetSocketAddress> nnAddrs) { - // this cloning logic is only used by hdfs - Text haService = HAUtilClient.buildTokenServiceForLogicalUri(haUri, - HdfsConstants.HDFS_URI_SCHEME); - Token<DelegationTokenIdentifier> haToken = - tokenSelector.selectToken(haService, ugi.getTokens()); - if (haToken != null) { - for (InetSocketAddress singleNNAddr : nnAddrs) { - // this is a minor hack to prevent physical HA tokens from being - // exposed to the user via UGI.getCredentials(), otherwise these - // cloned tokens may be inadvertently propagated to jobs - Token<DelegationTokenIdentifier> specificToken = - haToken.privateClone(buildTokenService(singleNNAddr)); - Text alias = new Text( - HAUtilClient.buildTokenServicePrefixForLogicalUri( - HdfsConstants.HDFS_URI_SCHEME) - + "//" + specificToken.getService()); - ugi.addToken(alias, specificToken); - if (LOG.isDebugEnabled()) { - LOG.debug("Mapped HA service delegation token for logical URI " + - haUri + " to namenode " + singleNNAddr); - } - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("No HA service delegation token found for logical URI " + - haUri); - } - } - } - - /** * Get the internet address of the currently-active NN. This should rarely be * used, since callers of this method who connect directly to the NN using the * resulting InetSocketAddress will not be able to connect to the active NN if http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java index 61d701d..d556c90 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB; import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider; +import org.apache.hadoop.hdfs.server.namenode.ha.NameNodeHAProxyFactory; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.io.Text; @@ -112,7 +113,7 @@ public class NameNodeProxies { throws IOException { AbstractNNFailoverProxyProvider<T> failoverProxyProvider = NameNodeProxiesClient.createFailoverProxyProvider(conf, nameNodeUri, - xface, true, fallbackToSimpleAuth); + xface, true, fallbackToSimpleAuth, new NameNodeHAProxyFactory<T>()); if (failoverProxyProvider == null) { return createNonHAProxy(conf, DFSUtilClient.getNNAddress(nameNodeUri), http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java index 8edaed6..6b489fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java @@ -17,19 +17,13 @@ */ package org.apache.hadoop.hdfs.server.namenode; -import java.io.IOException; -import java.net.InetSocketAddress; - -import javax.servlet.ServletContext; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; +import java.io.IOException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSUtilClient; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.NameNodeProxies; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.UserGroupInformation; @@ -63,25 +57,6 @@ abstract class DfsServlet extends HttpServlet { doc.endTag(); } - /** - * Create a {@link NameNode} proxy from the current {@link ServletContext}. - */ - protected ClientProtocol createNameNodeProxy() throws IOException { - ServletContext context = getServletContext(); - // if we are running in the Name Node, use it directly rather than via - // rpc - NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context); - if (nn != null) { - return nn.getRpcServer(); - } - InetSocketAddress nnAddr = - NameNodeHttpServer.getNameNodeAddressFromContext(context); - Configuration conf = new HdfsConfiguration( - NameNodeHttpServer.getConfFromContext(context)); - return NameNodeProxies.createProxy(conf, DFSUtilClient.getNNUri(nnAddr), - ClientProtocol.class).getProxy(); - } - protected UserGroupInformation getUGI(HttpServletRequest request, Configuration conf) throws IOException { return JspHelper.getUGI(getServletContext(), request, conf); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java deleted file mode 100644 index 0e8fa44..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java +++ /dev/null @@ -1,216 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.namenode.ha; - -import java.io.Closeable; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.URI; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.HAUtil; -import org.apache.hadoop.hdfs.NameNodeProxies; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.security.UserGroupInformation; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - -/** - * A FailoverProxyProvider implementation which allows one to configure - * multiple URIs to connect to during fail-over. A random configured address is - * tried first, and on a fail-over event the other addresses are tried - * sequentially in a random order. - */ -public class ConfiguredFailoverProxyProvider<T> extends - AbstractNNFailoverProxyProvider<T> { - - private static final Log LOG = - LogFactory.getLog(ConfiguredFailoverProxyProvider.class); - - interface ProxyFactory<T> { - T createProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface, - UserGroupInformation ugi, boolean withRetries, - AtomicBoolean fallbackToSimpleAuth) throws IOException; - } - - static class DefaultProxyFactory<T> implements ProxyFactory<T> { - @Override - public T createProxy(Configuration conf, InetSocketAddress nnAddr, - Class<T> xface, UserGroupInformation ugi, boolean withRetries, - AtomicBoolean fallbackToSimpleAuth) throws IOException { - return NameNodeProxies.createNonHAProxy(conf, - nnAddr, xface, ugi, false, fallbackToSimpleAuth).getProxy(); - } - } - - protected final Configuration conf; - protected final List<AddressRpcProxyPair<T>> proxies = - new ArrayList<AddressRpcProxyPair<T>>(); - private final UserGroupInformation ugi; - protected final Class<T> xface; - - private int currentProxyIndex = 0; - private final ProxyFactory<T> factory; - - public ConfiguredFailoverProxyProvider(Configuration conf, URI uri, - Class<T> xface) { - this(conf, uri, xface, new DefaultProxyFactory<T>()); - } - - @VisibleForTesting - ConfiguredFailoverProxyProvider(Configuration conf, URI uri, - Class<T> xface, ProxyFactory<T> factory) { - - Preconditions.checkArgument( - xface.isAssignableFrom(NamenodeProtocols.class), - "Interface class %s is not a valid NameNode protocol!"); - this.xface = xface; - - this.conf = new Configuration(conf); - int maxRetries = this.conf.getInt( - HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_KEY, - HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_DEFAULT); - this.conf.setInt( - CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, - maxRetries); - - int maxRetriesOnSocketTimeouts = this.conf.getInt( - HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY, - HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT); - this.conf.setInt( - CommonConfigurationKeysPublic - .IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, - maxRetriesOnSocketTimeouts); - - try { - ugi = UserGroupInformation.getCurrentUser(); - - Map<String, Map<String, InetSocketAddress>> map = DFSUtil.getHaNnRpcAddresses( - conf); - Map<String, InetSocketAddress> addressesInNN = map.get(uri.getHost()); - - if (addressesInNN == null || addressesInNN.size() == 0) { - throw new RuntimeException("Could not find any configured addresses " + - "for URI " + uri); - } - - Collection<InetSocketAddress> addressesOfNns = addressesInNN.values(); - for (InetSocketAddress address : addressesOfNns) { - proxies.add(new AddressRpcProxyPair<T>(address)); - } - // Randomize the list to prevent all clients pointing to the same one - boolean randomized = conf.getBoolean( - HdfsClientConfigKeys.Failover.RANDOM_ORDER, - HdfsClientConfigKeys.Failover.RANDOM_ORDER_DEFAULT); - if (randomized) { - Collections.shuffle(proxies); - } - - // The client may have a delegation token set for the logical - // URI of the cluster. Clone this token to apply to each of the - // underlying IPC addresses so that the IPC code can find it. - HAUtil.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns); - this.factory = factory; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public Class<T> getInterface() { - return xface; - } - - /** - * Lazily initialize the RPC proxy object. - */ - @Override - public synchronized ProxyInfo<T> getProxy() { - AddressRpcProxyPair<T> current = proxies.get(currentProxyIndex); - if (current.namenode == null) { - try { - current.namenode = factory.createProxy(conf, - current.address, xface, ugi, false, getFallbackToSimpleAuth()); - } catch (IOException e) { - LOG.error("Failed to create RPC proxy to NameNode", e); - throw new RuntimeException(e); - } - } - return new ProxyInfo<T>(current.namenode, current.address.toString()); - } - - @Override - public void performFailover(T currentProxy) { - incrementProxyIndex(); - } - - synchronized void incrementProxyIndex() { - currentProxyIndex = (currentProxyIndex + 1) % proxies.size(); - } - - /** - * A little pair object to store the address and connected RPC proxy object to - * an NN. Note that {@link AddressRpcProxyPair#namenode} may be null. - */ - private static class AddressRpcProxyPair<T> { - public final InetSocketAddress address; - public T namenode; - - public AddressRpcProxyPair(InetSocketAddress address) { - this.address = address; - } - } - - /** - * Close all the proxy objects which have been opened over the lifetime of - * this proxy provider. - */ - @Override - public synchronized void close() throws IOException { - for (AddressRpcProxyPair<T> proxy : proxies) { - if (proxy.namenode != null) { - if (proxy.namenode instanceof Closeable) { - ((Closeable)proxy.namenode).close(); - } else { - RPC.stopProxy(proxy.namenode); - } - } - } - } - - /** - * Logical URI is required for this failover proxy provider. - */ - @Override - public boolean useLogicalURI() { - return true; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java deleted file mode 100644 index 4e1cb9e..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.namenode.ha; - -import java.io.Closeable; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.URI; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.hdfs.DFSUtilClient; -import org.apache.hadoop.hdfs.NameNodeProxies; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.security.UserGroupInformation; - -import com.google.common.base.Preconditions; - -/** - * A NNFailoverProxyProvider implementation which works on IP failover setup. - * Only one proxy is used to connect to both servers and switching between - * the servers is done by the environment/infrastructure, which guarantees - * clients can consistently reach only one node at a time. - * - * Clients with a live connection will likely get connection reset after an - * IP failover. This case will be handled by the - * FailoverOnNetworkExceptionRetry retry policy. I.e. if the call is - * not idempotent, it won't get retried. - * - * A connection reset while setting up a connection (i.e. before sending a - * request) will be handled in ipc client. - * - * The namenode URI must contain a resolvable host name. - */ -public class IPFailoverProxyProvider<T> extends - AbstractNNFailoverProxyProvider<T> { - private final Configuration conf; - private final Class<T> xface; - private final URI nameNodeUri; - private ProxyInfo<T> nnProxyInfo = null; - - public IPFailoverProxyProvider(Configuration conf, URI uri, - Class<T> xface) { - Preconditions.checkArgument( - xface.isAssignableFrom(NamenodeProtocols.class), - "Interface class %s is not a valid NameNode protocol!"); - this.xface = xface; - this.nameNodeUri = uri; - - this.conf = new Configuration(conf); - int maxRetries = this.conf.getInt( - HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_KEY, - HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_DEFAULT); - this.conf.setInt( - CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, - maxRetries); - - int maxRetriesOnSocketTimeouts = this.conf.getInt( - HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY, - HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT); - this.conf.setInt( - CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, - maxRetriesOnSocketTimeouts); - } - - @Override - public Class<T> getInterface() { - return xface; - } - - @Override - public synchronized ProxyInfo<T> getProxy() { - // Create a non-ha proxy if not already created. - if (nnProxyInfo == null) { - try { - // Create a proxy that is not wrapped in RetryProxy - InetSocketAddress nnAddr = DFSUtilClient.getNNAddress(nameNodeUri); - nnProxyInfo = new ProxyInfo<T>(NameNodeProxies.createNonHAProxy( - conf, nnAddr, xface, UserGroupInformation.getCurrentUser(), - false).getProxy(), nnAddr.toString()); - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - } - return nnProxyInfo; - } - - /** Nothing to do for IP failover */ - @Override - public void performFailover(T currentProxy) { - } - - /** - * Close the proxy, - */ - @Override - public synchronized void close() throws IOException { - if (nnProxyInfo == null) { - return; - } - if (nnProxyInfo.proxy instanceof Closeable) { - ((Closeable)nnProxyInfo.proxy).close(); - } else { - RPC.stopProxy(nnProxyInfo.proxy); - } - } - - /** - * Logical URI is not used for IP failover. - */ - @Override - public boolean useLogicalURI() { - return false; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/NameNodeHAProxyFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/NameNodeHAProxyFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/NameNodeHAProxyFactory.java new file mode 100644 index 0000000..036b6eb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/NameNodeHAProxyFactory.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.NameNodeProxies; +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; + +public class NameNodeHAProxyFactory<T> implements HAProxyFactory<T> { + + @Override + public T createProxy(Configuration conf, InetSocketAddress nnAddr, + Class<T> xface, UserGroupInformation ugi, boolean withRetries, + AtomicBoolean fallbackToSimpleAuth) throws IOException { + return NameNodeProxies.createNonHAProxy(conf, nnAddr, xface, + ugi, withRetries, fallbackToSimpleAuth).getProxy(); + } + + @Override + public T createProxy(Configuration conf, InetSocketAddress nnAddr, + Class<T> xface, UserGroupInformation ugi, boolean withRetries) + throws IOException { + return NameNodeProxies.createNonHAProxy(conf, nnAddr, xface, + ugi, withRetries).getProxy(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
