Author: todd
Date: Tue Jan 17 03:10:25 2012
New Revision: 1232284
URL: http://svn.apache.org/viewvc?rev=1232284&view=rev
Log:
HDFS-2767. ConfiguredFailoverProxyProvider should support NameNodeProtocol.
Contributed by Uma Maheswara Rao G.
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt?rev=1232284&r1=1232283&r2=1232284&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
Tue Jan 17 03:10:25 2012
@@ -109,3 +109,5 @@ HDFS-2789. TestHAAdmin.testFailover is f
HDFS-2747. Entering safe mode after starting SBN can NPE. (Uma Maheswara Rao G
via todd)
HDFS-2772. On transition to active, standby should not swallow ELIE. (atm)
+
+HDFS-2767. ConfiguredFailoverProxyProvider should support NameNodeProtocol.
(Uma Maheswara Rao G via todd)
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1232284&r1=1232283&r2=1232284&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
Tue Jan 17 03:10:25 2012
@@ -94,9 +94,6 @@ import org.apache.hadoop.io.EnumSetWrita
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.retry.FailoverProxyProvider;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
@@ -109,7 +106,6 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.ReflectionUtils;
/********************************************************
* DFSClient can connect to a Hadoop Filesystem and
@@ -312,20 +308,10 @@ public class DFSClient implements java.i
this.clientName = leaserenewer.getClientName(dfsClientConf.taskId);
this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
-
- Class<?> failoverProxyProviderClass = getFailoverProxyProviderClass(
- nameNodeUri, conf);
-
- if (nameNodeUri != null && failoverProxyProviderClass != null) {
- FailoverProxyProvider failoverProxyProvider = (FailoverProxyProvider)
- ReflectionUtils.newInstance(failoverProxyProviderClass, conf);
- this.namenode = (ClientProtocol)RetryProxy.create(ClientProtocol.class,
- failoverProxyProvider,
- RetryPolicies.failoverOnNetworkException(
- RetryPolicies.TRY_ONCE_THEN_FAIL,
- dfsClientConf.maxFailoverAttempts,
- dfsClientConf.failoverSleepBaseMillis,
- dfsClientConf.failoverSleepMaxMillis));
+ ClientProtocol failoverNNProxy = (ClientProtocol) HAUtil
+ .createFailoverProxy(conf, nameNodeUri, ClientProtocol.class);
+ if (nameNodeUri != null && failoverNNProxy != null) {
+ this.namenode = failoverNNProxy;
nnAddress = null;
} else if (nameNodeUri != null && rpcNamenode == null) {
this.namenode = DFSUtil.createNamenode(NameNode.getAddress(nameNodeUri),
conf);
@@ -353,39 +339,6 @@ public class DFSClient implements java.i
LOG.debug("Short circuit read is " + shortCircuitLocalReads);
}
}
-
- private Class<?> getFailoverProxyProviderClass(URI nameNodeUri,
Configuration conf)
- throws IOException {
- if (nameNodeUri == null) {
- return null;
- }
- String host = nameNodeUri.getHost();
-
- String configKey = DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." +
host;
- try {
- Class<?> ret = conf.getClass(configKey, null);
- if (ret != null) {
- // If we found a proxy provider, then this URI should be a logical NN.
- // Given that, it shouldn't have a non-default port number.
- int port = nameNodeUri.getPort();
- if (port > 0 && port != NameNode.DEFAULT_PORT) {
- throw new IOException(
- "Port " + port + " specified in URI " + nameNodeUri +
- " but host '" + host + "' is a logical (HA) namenode" +
- " and does not use port information.");
- }
- }
- return ret;
- } catch (RuntimeException e) {
- if (e.getCause() instanceof ClassNotFoundException) {
- throw new IOException("Could not load failover proxy provider class "
- + conf.get(configKey) + " which is configured for authority " +
nameNodeUri,
- e);
- } else {
- throw e;
- }
- }
- }
/**
* Return the number of times the client should go back to the namenode
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1232284&r1=1232283&r2=1232284&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
Tue Jan 17 03:10:25 2012
@@ -28,10 +28,12 @@ import java.security.SecureRandom;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.StringTokenizer;
+import java.util.concurrent.TimeUnit;
import javax.net.SocketFactory;
@@ -47,7 +49,12 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
@@ -810,6 +817,32 @@ public class DFSUtil {
}
/**
+ * Build a NamenodeProtocol connection to the namenode and set up the retry
+ * policy
+ */
+ public static NamenodeProtocolTranslatorPB createNNProxyWithNamenodeProtocol(
+ InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
+ throws IOException {
+ RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(5, 200,
+ TimeUnit.MILLISECONDS);
+ Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap
+ = new HashMap<Class<? extends Exception>, RetryPolicy>();
+ RetryPolicy methodPolicy = RetryPolicies.retryByException(timeoutPolicy,
+ exceptionToPolicyMap);
+ Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<String,
RetryPolicy>();
+ methodNameToPolicyMap.put("getBlocks", methodPolicy);
+ methodNameToPolicyMap.put("getAccessKeys", methodPolicy);
+ RPC.setProtocolEngine(conf, NamenodeProtocolPB.class,
+ ProtobufRpcEngine.class);
+ NamenodeProtocolPB proxy = RPC.getProxy(NamenodeProtocolPB.class, RPC
+ .getProtocolVersion(NamenodeProtocolPB.class), address, ugi, conf,
+ NetUtils.getDefaultSocketFactory(conf));
+ NamenodeProtocolPB retryProxy = (NamenodeProtocolPB) RetryProxy.create(
+ NamenodeProtocolPB.class, proxy, methodNameToPolicyMap);
+ return new NamenodeProtocolTranslatorPB(retryProxy);
+ }
+
+ /**
* Get nameservice Id for the {@link NameNode} based on namenode RPC address
* matching the local node address.
*/
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java?rev=1232284&r1=1232283&r2=1232284&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
Tue Jan 17 03:10:25 2012
@@ -18,13 +18,23 @@
package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
-
+import java.io.IOException;
+import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
-import java.util.Collection;
+import java.net.URI;
import java.util.Map;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSClient.Conf;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.google.common.base.Preconditions;
public class HAUtil {
private HAUtil() { /* Hidden constructor */ }
@@ -110,5 +120,84 @@ public class HAUtil {
public static void setAllowStandbyReads(Configuration conf, boolean val) {
conf.setBoolean("dfs.ha.allow.stale.reads", val);
}
+
+ /** Creates the Failover proxy provider instance*/
+ @SuppressWarnings("unchecked")
+ public static <T> FailoverProxyProvider<T> createFailoverProxyProvider(
+ Configuration conf, Class<FailoverProxyProvider<?>>
failoverProxyProviderClass,
+ Class xface) throws IOException {
+ Preconditions.checkArgument(
+ xface.isAssignableFrom(NamenodeProtocols.class),
+ "Interface %s is not a NameNode protocol", xface);
+ try {
+ Constructor<FailoverProxyProvider<?>> ctor = failoverProxyProviderClass
+ .getConstructor(Class.class);
+ FailoverProxyProvider<?> provider = ctor.newInstance(xface);
+ ReflectionUtils.setConf(provider, conf);
+ return (FailoverProxyProvider<T>) provider;
+ } catch (Exception e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ } else {
+ throw new IOException(
+ "Couldn't create proxy provider " + failoverProxyProviderClass, e);
+ }
+ }
+ }
+ /** Gets the configured Failover proxy provider's class */
+ public static <T> Class<FailoverProxyProvider<T>>
getFailoverProxyProviderClass(
+ Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException {
+ if (nameNodeUri == null) {
+ return null;
+ }
+ String host = nameNodeUri.getHost();
+
+ String configKey = DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "."
+ + host;
+ try {
+ @SuppressWarnings("unchecked")
+ Class<FailoverProxyProvider<T>> ret = (Class<FailoverProxyProvider<T>>)
conf
+ .getClass(configKey, null, FailoverProxyProvider.class);
+ if (ret != null) {
+ // If we found a proxy provider, then this URI should be a logical NN.
+ // Given that, it shouldn't have a non-default port number.
+ int port = nameNodeUri.getPort();
+ if (port > 0 && port != NameNode.DEFAULT_PORT) {
+ throw new IOException("Port " + port + " specified in URI "
+ + nameNodeUri + " but host '" + host
+ + "' is a logical (HA) namenode"
+ + " and does not use port information.");
+ }
+ }
+ return ret;
+ } catch (RuntimeException e) {
+ if (e.getCause() instanceof ClassNotFoundException) {
+ throw new IOException("Could not load failover proxy provider class "
+ + conf.get(configKey) + " which is configured for authority "
+ + nameNodeUri, e);
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ /** Creates the namenode proxy with the passed Protocol */
+ @SuppressWarnings("unchecked")
+ public static Object createFailoverProxy(Configuration conf, URI nameNodeUri,
+ Class xface) throws IOException {
+ Class<FailoverProxyProvider<?>> failoverProxyProviderClass = HAUtil
+ .getFailoverProxyProviderClass(conf, nameNodeUri, xface);
+ if (failoverProxyProviderClass != null) {
+ FailoverProxyProvider<?> failoverProxyProvider = HAUtil
+ .createFailoverProxyProvider(conf, failoverProxyProviderClass,
xface);
+ Conf config = new Conf(conf);
+ return RetryProxy.create(xface, failoverProxyProvider, RetryPolicies
+ .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
+ config.maxFailoverAttempts, config.failoverSleepBaseMillis,
+ config.failoverSleepMaxMillis));
+ }
+ return null;
+ }
+
}
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1232284&r1=1232283&r2=1232284&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
Tue Jan 17 03:10:25 2012
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
@@ -58,7 +57,6 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
-import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
/**
@@ -88,7 +86,8 @@ class NameNodeConnector {
InetSocketAddress nn = Lists.newArrayList(haNNs).get(0);
// TODO(HA): need to deal with connecting to HA NN pair here
this.namenodeAddress = nn;
- this.namenode = createNamenode(nn, conf);
+ this.namenode = DFSUtil.createNNProxyWithNamenodeProtocol(nn, conf,
+ UserGroupInformation.getCurrentUser());
this.client = DFSUtil.createNamenode(conf);
this.fs = FileSystem.get(NameNode.getUri(nn), conf);
@@ -196,33 +195,6 @@ class NameNodeConnector {
+ "]";
}
- /** Build a NamenodeProtocol connection to the namenode and
- * set up the retry policy
- */
- private static NamenodeProtocol createNamenode(InetSocketAddress address,
- Configuration conf) throws IOException {
- RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(
- 5, 200, TimeUnit.MILLISECONDS);
- Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
- new HashMap<Class<? extends Exception>, RetryPolicy>();
- RetryPolicy methodPolicy = RetryPolicies.retryByException(
- timeoutPolicy, exceptionToPolicyMap);
- Map<String,RetryPolicy> methodNameToPolicyMap =
- new HashMap<String, RetryPolicy>();
- methodNameToPolicyMap.put("getBlocks", methodPolicy);
- methodNameToPolicyMap.put("getAccessKeys", methodPolicy);
-
- RPC.setProtocolEngine(conf, NamenodeProtocolPB.class,
- ProtobufRpcEngine.class);
- NamenodeProtocolPB proxy = RPC.getProxy(NamenodeProtocolPB.class,
- RPC.getProtocolVersion(NamenodeProtocolPB.class), address,
- UserGroupInformation.getCurrentUser(), conf,
- NetUtils.getDefaultSocketFactory(conf));
- NamenodeProtocolPB retryProxy = (NamenodeProtocolPB) RetryProxy.create(
- NamenodeProtocolPB.class, proxy, methodNameToPolicyMap);
- return new NamenodeProtocolTranslatorPB(retryProxy);
- }
-
/**
* Periodically updates access keys.
*/
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java?rev=1232284&r1=1232283&r2=1232284&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
Tue Jan 17 03:10:25 2012
@@ -32,52 +32,75 @@ import org.apache.hadoop.fs.CommonConfig
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
+import com.google.common.base.Preconditions;
+
/**
* A FailoverProxyProvider implementation which allows one to configure two
URIs
* to connect to during fail-over. The first configured address is tried first,
* and on a fail-over event the other address is tried.
*/
-public class ConfiguredFailoverProxyProvider implements FailoverProxyProvider,
- Configurable {
+public class ConfiguredFailoverProxyProvider<T> implements
+ FailoverProxyProvider<T>, Configurable {
private static final Log LOG =
LogFactory.getLog(ConfiguredFailoverProxyProvider.class);
private Configuration conf;
private int currentProxyIndex = 0;
- private List<AddressRpcProxyPair> proxies = new
ArrayList<AddressRpcProxyPair>();
+ private List<AddressRpcProxyPair<T>> proxies = new
ArrayList<AddressRpcProxyPair<T>>();
private UserGroupInformation ugi;
+ private final Class<T> xface;
+ public ConfiguredFailoverProxyProvider(Class<T> xface) {
+ Preconditions.checkArgument(
+ xface.isAssignableFrom(NamenodeProtocols.class),
+ "Interface class %s is not a valid NameNode protocol!");
+ this.xface = xface;
+ }
+
@Override
- public Class<?> getInterface() {
- return ClientProtocol.class;
+ public Class<T> getInterface() {
+ return xface;
}
/**
* Lazily initialize the RPC proxy object.
*/
+ @SuppressWarnings("unchecked")
@Override
- public synchronized Object getProxy() {
+ public synchronized T getProxy() {
AddressRpcProxyPair current = proxies.get(currentProxyIndex);
if (current.namenode == null) {
try {
- // TODO(HA): This will create a NN proxy with an underlying retry
- // proxy. We don't want this.
- current.namenode = DFSUtil.createNamenode(current.address, conf, ugi);
+ if (NamenodeProtocol.class.equals(xface)) {
+ current.namenode = DFSUtil.createNNProxyWithNamenodeProtocol(
+ current.address, conf, ugi);
+ } else if (ClientProtocol.class.equals(xface)) {
+ // TODO(HA): This will create a NN proxy with an underlying retry
+ // proxy. We don't want this.
+ current.namenode = DFSUtil.createNamenode(current.address, conf,
ugi);
+ } else {
+ throw new IllegalStateException(
+ "Upsupported protocol found when creating the proxy conection to
NameNode. "
+ + ((xface != null) ? xface.getClass().getName() : xface)
+ + " is not supported by " + this.getClass().getName());
+ }
} catch (IOException e) {
LOG.error("Failed to create RPC proxy to NameNode", e);
throw new RuntimeException(e);
}
}
- return current.namenode;
+ return (T)current.namenode;
}
@Override
- public synchronized void performFailover(Object currentProxy) {
+ public synchronized void performFailover(T currentProxy) {
currentProxyIndex = (currentProxyIndex + 1) % proxies.size();
}
@@ -113,7 +136,7 @@ public class ConfiguredFailoverProxyProv
Map<String, InetSocketAddress> addressesInNN = map.get(nsId);
for (InetSocketAddress address : addressesInNN.values()) {
- proxies.add(new AddressRpcProxyPair(address));
+ proxies.add(new AddressRpcProxyPair<T>(address));
}
} catch (IOException e) {
throw new RuntimeException(e);
@@ -124,9 +147,9 @@ public class ConfiguredFailoverProxyProv
* A little pair object to store the address and connected RPC proxy object
to
* an NN. Note that {@link AddressRpcProxyPair#namenode} may be null.
*/
- private static class AddressRpcProxyPair {
+ private static class AddressRpcProxyPair<T> {
public InetSocketAddress address;
- public ClientProtocol namenode;
+ public T namenode;
public AddressRpcProxyPair(InetSocketAddress address) {
this.address = address;
@@ -139,7 +162,7 @@ public class ConfiguredFailoverProxyProv
*/
@Override
public synchronized void close() throws IOException {
- for (AddressRpcProxyPair proxy : proxies) {
+ for (AddressRpcProxyPair<T> proxy : proxies) {
if (proxy.namenode != null) {
if (proxy.namenode instanceof Closeable) {
((Closeable)proxy.namenode).close();