This is an automated email from the ASF dual-hosted git repository. cliang pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 32737360a5b27db2fc34a7e89263ace3267e3295 Author: Erik Krogen <[email protected]> AuthorDate: Thu Sep 20 13:27:58 2018 -0700 HDFS-13749. [SBN read] Use getServiceStatus to discover observer namenodes. Contributed by Chao Sun. --- .../apache/hadoop/hdfs/NameNodeProxiesClient.java | 42 ++++++++- .../ha/AbstractNNFailoverProxyProvider.java | 36 +++++-- .../namenode/ha/IPFailoverProxyProvider.java | 2 +- .../namenode/ha/ObserverReadProxyProvider.java | 49 +--------- .../namenode/ha/TestObserverReadProxyProvider.java | 105 +++++++++++++++------ 5 files changed, 149 insertions(+), 85 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java index 284e4ef..b71e84d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java @@ -25,12 +25,16 @@ import java.net.InetSocketAddress; import java.net.URI; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.namenode.ha.ClientHAProxyFactory; import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory; import org.apache.hadoop.ipc.AlignmentContext; +import org.apache.hadoop.ipc.Client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,8 +66,9 @@ import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; /** - * Create proxy objects with {@link ClientProtocol} to communicate with a remote - * NN. Generally use {@link NameNodeProxiesClient#createProxyWithClientProtocol( + * Create proxy objects with {@link ClientProtocol} and + * {@link HAServiceProtocol} to communicate with a remote NN. For the former, + * generally use {@link NameNodeProxiesClient#createProxyWithClientProtocol( * Configuration, URI, AtomicBoolean)}, which will create either an HA- or * non-HA-enabled client proxy as appropriate. * @@ -76,6 +81,11 @@ public class NameNodeProxiesClient { private static final Logger LOG = LoggerFactory.getLogger( NameNodeProxiesClient.class); + /** Maximum # of retries for HAProxy with HAServiceProtocol. */ + private static final int MAX_RETRIES = 3; + /** Initial retry delay for HAProxy with HAServiceProtocol. */ + private static final int DELAY_MILLISECONDS = 200; + /** * Wrapper for a client proxy as well as its associated service ID. * This is simply used as a tuple-like return type for created NN proxy. @@ -343,6 +353,34 @@ public class NameNodeProxiesClient { fallbackToSimpleAuth, null); } + /** + * Creates a non-HA proxy object with {@link HAServiceProtocol} to the + * given NameNode address, using the provided configuration. The proxy will + * use the RPC timeout configuration specified via {@link + * org.apache.hadoop.fs.CommonConfigurationKeys#IPC_CLIENT_RPC_TIMEOUT_KEY}. + * Upon failures, this will retry up to certain times with {@link RetryProxy}. + * + * @param address the NameNode address + * @param conf the configuration to be used + * @return a non-HA proxy with {@link HAServiceProtocol}. + */ + public static HAServiceProtocol createNonHAProxyWithHAServiceProtocol( + InetSocketAddress address, Configuration conf) throws IOException { + RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry( + MAX_RETRIES, DELAY_MILLISECONDS, TimeUnit.MILLISECONDS); + + HAServiceProtocol proxy = + new HAServiceProtocolClientSideTranslatorPB( + address, conf, NetUtils.getDefaultSocketFactory(conf), + Client.getRpcTimeout(conf)); + return (HAServiceProtocol) RetryProxy.create( + HAServiceProtocol.class, + new DefaultFailoverProxyProvider<>(HAServiceProtocol.class, proxy), + new HashMap<>(), + timeoutPolicy + ); + } + public static ClientProtocol createProxyWithAlignmentContext( InetSocketAddress address, Configuration conf, UserGroupInformation ugi, boolean withRetries, AtomicBoolean fallbackToSimpleAuth, diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java index 32edb36..1b5ad16 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java @@ -28,11 +28,14 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.HAUtilClient; +import org.apache.hadoop.hdfs.NameNodeProxiesClient; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.io.retry.FailoverProxyProvider; import org.apache.hadoop.security.UserGroupInformation; @@ -119,23 +122,44 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements */ private HAServiceState cachedState; - public NNProxyInfo(InetSocketAddress address) { + /** Proxy for getting HA service status from the given NameNode. */ + private HAServiceProtocol serviceProxy; + + public NNProxyInfo(InetSocketAddress address, Configuration conf) { super(null, address.toString()); this.address = address; + try { + serviceProxy = NameNodeProxiesClient + .createNonHAProxyWithHAServiceProtocol(address, conf); + } catch (IOException ioe) { + LOG.error("Failed to create HAServiceProtocol proxy to NameNode" + + " at {}", address, ioe); + throw new RuntimeException(ioe); + } } public InetSocketAddress getAddress() { return address; } - public void setCachedState(HAServiceState state) { - cachedState = state; + public void refreshCachedState() { + try { + cachedState = serviceProxy.getServiceStatus().getState(); + } catch (IOException e) { + LOG.warn("Failed to connect to {}. Setting cached state to Standby", + address, e); + cachedState = HAServiceState.STANDBY; + } } public HAServiceState getCachedState() { return cachedState; } + @VisibleForTesting + public void setServiceProxyForTesting(HAServiceProtocol proxy) { + this.serviceProxy = proxy; + } } @Override @@ -153,8 +177,8 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements pi.proxy = factory.createProxy(conf, pi.getAddress(), xface, ugi, false, getFallbackToSimpleAuth()); } catch (IOException ioe) { - LOG.error("{} Failed to create RPC proxy to NameNode", - this.getClass().getSimpleName(), ioe); + LOG.error("{} Failed to create RPC proxy to NameNode at {}", + this.getClass().getSimpleName(), pi.address, ioe); throw new RuntimeException(ioe); } } @@ -178,7 +202,7 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements Collection<InetSocketAddress> addressesOfNns = addressesInNN.values(); for (InetSocketAddress address : addressesOfNns) { - proxies.add(new NNProxyInfo<T>(address)); + proxies.add(new NNProxyInfo<T>(address, conf)); } // Randomize the list to prevent all clients pointing to the same one boolean randomized = getRandomOrder(conf, uri); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java index e703740..8062e79 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java @@ -48,7 +48,7 @@ public class IPFailoverProxyProvider<T> extends public IPFailoverProxyProvider(Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory) { super(conf, uri, xface, factory); - this.nnProxyInfo = new NNProxyInfo<T>(DFSUtilClient.getNNAddress(uri)); + this.nnProxyInfo = new NNProxyInfo<>(DFSUtilClient.getNNAddress(uri), conf); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java index e819282..690ee0b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java @@ -27,12 +27,10 @@ import java.net.URI; import java.util.List; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.ClientGSIContext; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.io.retry.AtMostOnce; import org.apache.hadoop.io.retry.Idempotent; import org.apache.hadoop.io.retry.RetryPolicies; @@ -40,8 +38,6 @@ import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.ipc.StandbyException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -181,49 +177,6 @@ public class ObserverReadProxyProvider<T extends ClientProtocol> return lastProxy; } - private static <T extends ClientProtocol> HAServiceState getServiceState( - NNProxyInfo<T> pi) { - // TODO: should introduce new ClientProtocol method to verify the - // underlying service state, which does not require superuser access - // The is a workaround - IOException ioe = null; - try { - // Verify write access first - pi.proxy.reportBadBlocks(new LocatedBlock[0]); - return HAServiceState.ACTIVE; // Only active NameNode allows write - } catch (RemoteException re) { - IOException sbe = re.unwrapRemoteException(StandbyException.class); - if (!(sbe instanceof StandbyException)) { - ioe = re; - } - } catch (IOException e) { - ioe = e; - } - if (ioe != null) { - LOG.warn("Failed to connect to {}", pi.getAddress(), ioe); - return HAServiceState.STANDBY; // Just assume standby in this case - // Anything besides observer is fine - } - // Verify read access - // For now we assume only Observer nodes allow reads - // Stale reads on StandbyNode should be turned off - try { - pi.proxy.checkAccess("/", FsAction.READ); - return HAServiceState.OBSERVER; - } catch (RemoteException re) { - IOException sbe = re.unwrapRemoteException(StandbyException.class); - if (!(sbe instanceof StandbyException)) { - ioe = re; - } - } catch (IOException e) { - ioe = e; - } - if (ioe != null) { - LOG.warn("Failed to connect to {}", pi.getAddress(), ioe); - } - return HAServiceState.STANDBY; - } - /** * Return the currently used proxy. If there is none, first calls * {@link #changeProxy(NNProxyInfo)} to initialize one. @@ -254,7 +207,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol> currentProxy = null; currentIndex = (currentIndex + 1) % nameNodeProxies.size(); currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex)); - currentProxy.setCachedState(getServiceState(currentProxy)); + currentProxy.refreshCachedState(); LOG.debug("Changed current proxy from {} to {}", initial == null ? "none" : initial.proxyInfo, currentProxy.proxyInfo); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java index 4d5bc13..3f56c96 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java @@ -22,10 +22,13 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.HAServiceStatus; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -38,10 +41,12 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; - +import static org.mockito.Mockito.when; /** * Tests for {@link ObserverReadProxyProvider} under various configurations of @@ -56,7 +61,7 @@ public class TestObserverReadProxyProvider { private Configuration conf; private ObserverReadProxyProvider<ClientProtocol> proxyProvider; - private ClientProtocolAnswer[] namenodeAnswers; + private NameNodeAnswer[] namenodeAnswers; private String[] namenodeAddrs; @Before @@ -70,32 +75,53 @@ public class TestObserverReadProxyProvider { private void setupProxyProvider(int namenodeCount) throws Exception { String[] namenodeIDs = new String[namenodeCount]; namenodeAddrs = new String[namenodeCount]; - namenodeAnswers = new ClientProtocolAnswer[namenodeCount]; + namenodeAnswers = new NameNodeAnswer[namenodeCount]; ClientProtocol[] proxies = new ClientProtocol[namenodeCount]; Map<String, ClientProtocol> proxyMap = new HashMap<>(); + HAServiceProtocol[] serviceProxies = new HAServiceProtocol[namenodeCount]; + Map<String, HAServiceProtocol> serviceProxyMap = new HashMap<>(); for (int i = 0; i < namenodeCount; i++) { namenodeIDs[i] = "nn" + i; namenodeAddrs[i] = "namenode" + i + ".test:8020"; conf.set(HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + "." + namenodeIDs[i], namenodeAddrs[i]); - namenodeAnswers[i] = new ClientProtocolAnswer(); + namenodeAnswers[i] = new NameNodeAnswer(); proxies[i] = mock(ClientProtocol.class); - doWrite(Mockito.doAnswer(namenodeAnswers[i]).when(proxies[i])); - doRead(Mockito.doAnswer(namenodeAnswers[i]).when(proxies[i])); + doWrite(Mockito.doAnswer(namenodeAnswers[i].clientAnswer) + .when(proxies[i])); + doRead(Mockito.doAnswer(namenodeAnswers[i].clientAnswer) + .when(proxies[i])); + serviceProxies[i] = mock(HAServiceProtocol.class); + Mockito.doAnswer(namenodeAnswers[i].serviceAnswer) + .when(serviceProxies[i]).getServiceStatus(); proxyMap.put(namenodeAddrs[i], proxies[i]); + serviceProxyMap.put(namenodeAddrs[i], serviceProxies[i]); } conf.set(HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, Joiner.on(",").join(namenodeIDs)); - proxyProvider = new ObserverReadProxyProvider<>(conf, nnURI, - ClientProtocol.class, new ClientHAProxyFactory<ClientProtocol>() { + proxyProvider = new ObserverReadProxyProvider<ClientProtocol>(conf, nnURI, + ClientProtocol.class, + new ClientHAProxyFactory<ClientProtocol>() { + @Override + public ClientProtocol createProxy(Configuration config, + InetSocketAddress nnAddr, Class<ClientProtocol> xface, + UserGroupInformation ugi, boolean withRetries, + AtomicBoolean fallbackToSimpleAuth) { + return proxyMap.get(nnAddr.toString()); + } + }) { @Override - public ClientProtocol createProxy(Configuration conf, - InetSocketAddress nnAddr, Class<ClientProtocol> xface, - UserGroupInformation ugi, boolean withRetries, - AtomicBoolean fallbackToSimpleAuth) { - return proxyMap.get(nnAddr.toString()); + protected List<NNProxyInfo<ClientProtocol>> getProxyAddresses( + URI uri, String addressKey) { + List<NNProxyInfo<ClientProtocol>> nnProxies = + super.getProxyAddresses(uri, addressKey); + for (NNProxyInfo<ClientProtocol> nnProxy : nnProxies) { + String addressStr = nnProxy.getAddress().toString(); + nnProxy.setServiceProxyForTesting(serviceProxyMap.get(addressStr)); + } + return nnProxies; } - }); + }; proxyProvider.setObserverReadEnabled(true); } @@ -275,39 +301,62 @@ public class TestObserverReadProxyProvider { } /** - * An {@link Answer} used for mocking of a {@link ClientProtocol}. Setting - * the state or unreachability of this Answer will make the linked - * ClientProtocol respond as if it was communicating with a NameNode of - * the corresponding state. It is in Standby state by default. + * An {@link Answer} used for mocking of {@link ClientProtocol} and + * {@link HAServiceProtocol}. Setting the state or unreachability of this + * Answer will make the linked ClientProtocol respond as if it was + * communicating with a NameNode of the corresponding state. It is in Standby + * state by default. */ - private static class ClientProtocolAnswer implements Answer<Void> { + private static class NameNodeAnswer { private volatile boolean unreachable = false; // Standby state by default private volatile boolean allowWrites = false; private volatile boolean allowReads = false; - @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { - if (unreachable) { - throw new IOException("Unavailable"); + private ClientProtocolAnswer clientAnswer = new ClientProtocolAnswer(); + private HAServiceProtocolAnswer serviceAnswer = + new HAServiceProtocolAnswer(); + + private class HAServiceProtocolAnswer implements Answer<HAServiceStatus> { + @Override + public HAServiceStatus answer(InvocationOnMock invocation) + throws Throwable { + HAServiceStatus status = mock(HAServiceStatus.class); + if (allowReads && allowWrites) { + when(status.getState()).thenReturn(HAServiceState.ACTIVE); + } else if (allowReads) { + when(status.getState()).thenReturn(HAServiceState.OBSERVER); + } else { + when(status.getState()).thenReturn(HAServiceState.STANDBY); + } + return status; } - switch (invocationOnMock.getMethod().getName()) { + } + + private class ClientProtocolAnswer implements Answer<Void> { + @Override + public Void answer(InvocationOnMock invocationOnMock) throws Throwable { + if (unreachable) { + throw new IOException("Unavailable"); + } + switch (invocationOnMock.getMethod().getName()) { case "reportBadBlocks": if (!allowWrites) { - throw new RemoteException(StandbyException.class.getCanonicalName(), - "No writes!"); + throw new RemoteException( + StandbyException.class.getCanonicalName(), "No writes!"); } return null; case "checkAccess": if (!allowReads) { - throw new RemoteException(StandbyException.class.getCanonicalName(), - "No reads!"); + throw new RemoteException( + StandbyException.class.getCanonicalName(), "No reads!"); } return null; default: throw new IllegalArgumentException( "Only reportBadBlocks and checkAccess supported!"); + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
