Repository: hadoop Updated Branches: refs/heads/trunk 19b89c4c7 -> 9e0e430f1
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e0e430f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java deleted file mode 100644 index 2f6c9bc..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java +++ /dev/null @@ -1,241 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.namenode.ha; - -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; -import java.net.URI; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.ipc.StandbyException; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.io.retry.MultiException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A FailoverProxyProvider implementation that technically does not "failover" - * per-se. It constructs a wrapper proxy that sends the request to ALL - * underlying proxies simultaneously. It assumes the in an HA setup, there will - * be only one Active, and the active should respond faster than any configured - * standbys. Once it receive a response from any one of the configred proxies, - * outstanding requests to other proxies are immediately cancelled. - */ -public class RequestHedgingProxyProvider<T> extends - ConfiguredFailoverProxyProvider<T> { - - public static final Logger LOG = - LoggerFactory.getLogger(RequestHedgingProxyProvider.class); - - class RequestHedgingInvocationHandler implements InvocationHandler { - - final Map<String, ProxyInfo<T>> targetProxies; - - public RequestHedgingInvocationHandler( - Map<String, ProxyInfo<T>> targetProxies) { - this.targetProxies = new HashMap<>(targetProxies); - } - - /** - * Creates a Executor and invokes all proxies concurrently. This - * implementation assumes that Clients have configured proper socket - * timeouts, else the call can block forever. - * - * @param proxy - * @param method - * @param args - * @return - * @throws Throwable - */ - @Override - public Object - invoke(Object proxy, final Method method, final Object[] args) - throws Throwable { - Map<Future<Object>, ProxyInfo<T>> proxyMap = new HashMap<>(); - int numAttempts = 0; - - ExecutorService executor = null; - CompletionService<Object> completionService; - try { - // Optimization : if only 2 proxies are configured and one had failed - // over, then we dont need to create a threadpool etc. - targetProxies.remove(toIgnore); - if (targetProxies.size() == 1) { - ProxyInfo<T> proxyInfo = targetProxies.values().iterator().next(); - Object retVal = method.invoke(proxyInfo.proxy, args); - successfulProxy = proxyInfo; - return retVal; - } - executor = Executors.newFixedThreadPool(proxies.size()); - completionService = new ExecutorCompletionService<>(executor); - for (final Map.Entry<String, ProxyInfo<T>> pEntry : - targetProxies.entrySet()) { - Callable<Object> c = new Callable<Object>() { - @Override - public Object call() throws Exception { - LOG.trace("Invoking method {} on proxy {}", method, - pEntry.getValue().proxyInfo); - return method.invoke(pEntry.getValue().proxy, args); - } - }; - proxyMap.put(completionService.submit(c), pEntry.getValue()); - numAttempts++; - } - - Map<String, Exception> badResults = new HashMap<>(); - while (numAttempts > 0) { - Future<Object> callResultFuture = completionService.take(); - Object retVal; - try { - retVal = callResultFuture.get(); - successfulProxy = proxyMap.get(callResultFuture); - LOG.debug("Invocation successful on [{}]", - successfulProxy.proxyInfo); - return retVal; - } catch (Exception ex) { - ProxyInfo<T> tProxyInfo = proxyMap.get(callResultFuture); - logProxyException(ex, tProxyInfo.proxyInfo); - badResults.put(tProxyInfo.proxyInfo, unwrapException(ex)); - LOG.trace("Unsuccessful invocation on [{}]", tProxyInfo.proxyInfo); - numAttempts--; - } - } - - // At this point we should have All bad results (Exceptions) - // Or should have returned with successful result. - if (badResults.size() == 1) { - throw badResults.values().iterator().next(); - } else { - throw new MultiException(badResults); - } - } finally { - if (executor != null) { - LOG.trace("Shutting down threadpool executor"); - executor.shutdownNow(); - } - } - } - } - - - private volatile ProxyInfo<T> successfulProxy = null; - private volatile String toIgnore = null; - - public RequestHedgingProxyProvider( - Configuration conf, URI uri, Class<T> xface) { - this(conf, uri, xface, new DefaultProxyFactory<T>()); - } - - @VisibleForTesting - RequestHedgingProxyProvider(Configuration conf, URI uri, - Class<T> xface, ProxyFactory<T> factory) { - super(conf, uri, xface, factory); - } - - @SuppressWarnings("unchecked") - @Override - public synchronized ProxyInfo<T> getProxy() { - if (successfulProxy != null) { - return successfulProxy; - } - Map<String, ProxyInfo<T>> targetProxyInfos = new HashMap<>(); - StringBuilder combinedInfo = new StringBuilder("["); - for (int i = 0; i < proxies.size(); i++) { - ProxyInfo<T> pInfo = super.getProxy(); - incrementProxyIndex(); - targetProxyInfos.put(pInfo.proxyInfo, pInfo); - combinedInfo.append(pInfo.proxyInfo).append(','); - } - combinedInfo.append(']'); - T wrappedProxy = (T) Proxy.newProxyInstance( - RequestHedgingInvocationHandler.class.getClassLoader(), - new Class<?>[]{xface}, - new RequestHedgingInvocationHandler(targetProxyInfos)); - return new ProxyInfo<T>(wrappedProxy, combinedInfo.toString()); - } - - @Override - public synchronized void performFailover(T currentProxy) { - toIgnore = successfulProxy.proxyInfo; - successfulProxy = null; - } - - /** - * Check the exception returned by the proxy log a warning message if it's - * not a StandbyException (expected exception). - * @param ex Exception to evaluate. - * @param proxyInfo Information of the proxy reporting the exception. - */ - private void logProxyException(Exception ex, String proxyInfo) { - if (isStandbyException(ex)) { - LOG.debug("Invocation returned standby exception on [{}]", proxyInfo); - } else { - LOG.warn("Invocation returned exception on [{}]", proxyInfo); - } - } - - /** - * Check if the returned exception is caused by an standby namenode. - * @param ex Exception to check. - * @return If the exception is caused by an standby namenode. - */ - private boolean isStandbyException(Exception ex) { - Exception exception = unwrapException(ex); - if (exception instanceof RemoteException) { - return ((RemoteException) exception).unwrapRemoteException() - instanceof StandbyException; - } - return false; - } - - /** - * Unwraps the exception. <p> - * Example: - * <blockquote><pre> - * if ex is - * ExecutionException(InvocationTargetExeption(SomeException)) - * returns SomeException - * </pre></blockquote> - * - * @return unwrapped exception - */ - private Exception unwrapException(Exception ex) { - if (ex != null) { - Throwable cause = ex.getCause(); - if (cause instanceof Exception) { - Throwable innerCause = cause.getCause(); - if (innerCause instanceof Exception) { - return (Exception) innerCause; - } - return (Exception) cause; - } - } - return ex; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e0e430f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java index 7e8621b..6265f44 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java @@ -42,10 +42,10 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.namenode.ha.IPFailoverProxyProvider; +import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.retry.FailoverProxyProvider; @@ -333,7 +333,7 @@ public class TestDFSClientFailover { private Class<T> xface; private T proxy; public DummyLegacyFailoverProxyProvider(Configuration conf, URI uri, - Class<T> xface) { + Class<T> xface, HAProxyFactory<T> proxyFactory) { try { this.proxy = NameNodeProxies.createNonHAProxy(conf, DFSUtilClient.getNNAddress(uri), xface, http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e0e430f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java index 7257bbd..14ad6dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java @@ -513,7 +513,7 @@ public class TestDFSUtil { NS2_NN2_HOST); Map<String, Map<String, InetSocketAddress>> map = - DFSUtil.getHaNnRpcAddresses(conf); + DFSUtilClient.getHaNnRpcAddresses(conf); assertTrue(HAUtil.isHAEnabled(conf, "ns1")); assertTrue(HAUtil.isHAEnabled(conf, "ns2")); http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e0e430f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java index 632bbf6..ca44c79 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java @@ -292,7 +292,7 @@ public class TestDelegationTokensWithHA { nn0.getNameNodeAddress().getPort())); nnAddrs.add(new InetSocketAddress("localhost", nn1.getNameNodeAddress().getPort())); - HAUtil.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs); + HAUtilClient.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs); Collection<Token<? extends TokenIdentifier>> tokens = ugi.getTokens(); assertEquals(3, tokens.size()); @@ -321,7 +321,7 @@ public class TestDelegationTokensWithHA { } // reclone the tokens, and see if they match now - HAUtil.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs); + HAUtilClient.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs); for (InetSocketAddress addr : nnAddrs) { Text ipcDtService = SecurityUtil.buildTokenService(addr); Token<DelegationTokenIdentifier> token2 = http://git-wip-us.apache.org/repos/asf/hadoop/blob/9e0e430f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java deleted file mode 100644 index 37532d5..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java +++ /dev/null @@ -1,470 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.namenode.ha; - -import java.io.EOFException; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.ConnectException; -import java.net.InetSocketAddress; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Iterator; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.ProxyFactory; -import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; -import org.apache.hadoop.io.retry.MultiException; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.ipc.StandbyException; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.util.Time; -import org.apache.log4j.Level; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.mockito.Matchers; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import com.google.common.collect.Lists; - -public class TestRequestHedgingProxyProvider { - - private Configuration conf; - private URI nnUri; - private String ns; - - @BeforeClass - public static void setupClass() throws Exception { - GenericTestUtils.setLogLevel(RequestHedgingProxyProvider.LOG, Level.TRACE); - } - - @Before - public void setup() throws URISyntaxException { - ns = "mycluster-" + Time.monotonicNow(); - nnUri = new URI("hdfs://" + ns); - conf = new Configuration(); - conf.set(DFSConfigKeys.DFS_NAMESERVICES, ns); - conf.set( - DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, "nn1,nn2"); - conf.set( - DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn1", - "machine1.foo.bar:9820"); - conf.set( - DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn2", - "machine2.foo.bar:9820"); - } - - @Test - public void testHedgingWhenOneFails() throws Exception { - final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class); - Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() { - @Override - public long[] answer(InvocationOnMock invocation) throws Throwable { - Thread.sleep(1000); - return new long[]{1}; - } - }); - final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class); - Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!")); - - RequestHedgingProxyProvider<NamenodeProtocols> provider = - new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class, - createFactory(badMock, goodMock)); - long[] stats = provider.getProxy().proxy.getStats(); - Assert.assertTrue(stats.length == 1); - Mockito.verify(badMock).getStats(); - Mockito.verify(goodMock).getStats(); - } - - @Test - public void testHedgingWhenOneIsSlow() throws Exception { - final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class); - Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() { - @Override - public long[] answer(InvocationOnMock invocation) throws Throwable { - Thread.sleep(1000); - return new long[]{1}; - } - }); - final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class); - Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!")); - - RequestHedgingProxyProvider<NamenodeProtocols> provider = - new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class, - createFactory(goodMock, badMock)); - long[] stats = provider.getProxy().proxy.getStats(); - Assert.assertTrue(stats.length == 1); - Assert.assertEquals(1, stats[0]); - Mockito.verify(badMock).getStats(); - Mockito.verify(goodMock).getStats(); - } - - @Test - public void testHedgingWhenBothFail() throws Exception { - NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class); - Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!")); - NamenodeProtocols worseMock = Mockito.mock(NamenodeProtocols.class); - Mockito.when(worseMock.getStats()).thenThrow( - new IOException("Worse mock !!")); - - RequestHedgingProxyProvider<NamenodeProtocols> provider = - new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class, - createFactory(badMock, worseMock)); - try { - provider.getProxy().proxy.getStats(); - Assert.fail("Should fail since both namenodes throw IOException !!"); - } catch (Exception e) { - Assert.assertTrue(e instanceof MultiException); - } - Mockito.verify(badMock).getStats(); - Mockito.verify(worseMock).getStats(); - } - - @Test - public void testPerformFailover() throws Exception { - final AtomicInteger counter = new AtomicInteger(0); - final int[] isGood = {1}; - final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class); - Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() { - @Override - public long[] answer(InvocationOnMock invocation) throws Throwable { - counter.incrementAndGet(); - if (isGood[0] == 1) { - Thread.sleep(1000); - return new long[]{1}; - } - throw new IOException("Was Good mock !!"); - } - }); - final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class); - Mockito.when(badMock.getStats()).thenAnswer(new Answer<long[]>() { - @Override - public long[] answer(InvocationOnMock invocation) throws Throwable { - counter.incrementAndGet(); - if (isGood[0] == 2) { - Thread.sleep(1000); - return new long[]{2}; - } - throw new IOException("Bad mock !!"); - } - }); - RequestHedgingProxyProvider<NamenodeProtocols> provider = - new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class, - createFactory(goodMock, badMock)); - long[] stats = provider.getProxy().proxy.getStats(); - Assert.assertTrue(stats.length == 1); - Assert.assertEquals(1, stats[0]); - Assert.assertEquals(2, counter.get()); - Mockito.verify(badMock).getStats(); - Mockito.verify(goodMock).getStats(); - - stats = provider.getProxy().proxy.getStats(); - Assert.assertTrue(stats.length == 1); - Assert.assertEquals(1, stats[0]); - // Ensure only the previous successful one is invoked - Mockito.verifyNoMoreInteractions(badMock); - Assert.assertEquals(3, counter.get()); - - // Flip to standby.. so now this should fail - isGood[0] = 2; - try { - provider.getProxy().proxy.getStats(); - Assert.fail("Should fail since previously successful proxy now fails "); - } catch (Exception ex) { - Assert.assertTrue(ex instanceof IOException); - } - - Assert.assertEquals(4, counter.get()); - - provider.performFailover(provider.getProxy().proxy); - stats = provider.getProxy().proxy.getStats(); - Assert.assertTrue(stats.length == 1); - Assert.assertEquals(2, stats[0]); - - // Counter should update only once - Assert.assertEquals(5, counter.get()); - - stats = provider.getProxy().proxy.getStats(); - Assert.assertTrue(stats.length == 1); - Assert.assertEquals(2, stats[0]); - - // Counter updates only once now - Assert.assertEquals(6, counter.get()); - - // Flip back to old active.. so now this should fail - isGood[0] = 1; - try { - provider.getProxy().proxy.getStats(); - Assert.fail("Should fail since previously successful proxy now fails "); - } catch (Exception ex) { - Assert.assertTrue(ex instanceof IOException); - } - - Assert.assertEquals(7, counter.get()); - - provider.performFailover(provider.getProxy().proxy); - stats = provider.getProxy().proxy.getStats(); - Assert.assertTrue(stats.length == 1); - // Ensure correct proxy was called - Assert.assertEquals(1, stats[0]); - } - - @Test - public void testPerformFailoverWith3Proxies() throws Exception { - conf.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, - "nn1,nn2,nn3"); - conf.set(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn3", - "machine3.foo.bar:9820"); - - final AtomicInteger counter = new AtomicInteger(0); - final int[] isGood = {1}; - final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class); - Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() { - @Override - public long[] answer(InvocationOnMock invocation) throws Throwable { - counter.incrementAndGet(); - if (isGood[0] == 1) { - Thread.sleep(1000); - return new long[]{1}; - } - throw new IOException("Was Good mock !!"); - } - }); - final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class); - Mockito.when(badMock.getStats()).thenAnswer(new Answer<long[]>() { - @Override - public long[] answer(InvocationOnMock invocation) throws Throwable { - counter.incrementAndGet(); - if (isGood[0] == 2) { - Thread.sleep(1000); - return new long[]{2}; - } - throw new IOException("Bad mock !!"); - } - }); - final NamenodeProtocols worseMock = Mockito.mock(NamenodeProtocols.class); - Mockito.when(worseMock.getStats()).thenAnswer(new Answer<long[]>() { - @Override - public long[] answer(InvocationOnMock invocation) throws Throwable { - counter.incrementAndGet(); - if (isGood[0] == 3) { - Thread.sleep(1000); - return new long[]{3}; - } - throw new IOException("Worse mock !!"); - } - }); - - RequestHedgingProxyProvider<NamenodeProtocols> provider = - new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class, - createFactory(goodMock, badMock, worseMock)); - long[] stats = provider.getProxy().proxy.getStats(); - Assert.assertTrue(stats.length == 1); - Assert.assertEquals(1, stats[0]); - Assert.assertEquals(3, counter.get()); - Mockito.verify(badMock).getStats(); - Mockito.verify(goodMock).getStats(); - Mockito.verify(worseMock).getStats(); - - stats = provider.getProxy().proxy.getStats(); - Assert.assertTrue(stats.length == 1); - Assert.assertEquals(1, stats[0]); - // Ensure only the previous successful one is invoked - Mockito.verifyNoMoreInteractions(badMock); - Mockito.verifyNoMoreInteractions(worseMock); - Assert.assertEquals(4, counter.get()); - - // Flip to standby.. so now this should fail - isGood[0] = 2; - try { - provider.getProxy().proxy.getStats(); - Assert.fail("Should fail since previously successful proxy now fails "); - } catch (Exception ex) { - Assert.assertTrue(ex instanceof IOException); - } - - Assert.assertEquals(5, counter.get()); - - provider.performFailover(provider.getProxy().proxy); - stats = provider.getProxy().proxy.getStats(); - Assert.assertTrue(stats.length == 1); - Assert.assertEquals(2, stats[0]); - - // Counter updates twice since both proxies are tried on failure - Assert.assertEquals(7, counter.get()); - - stats = provider.getProxy().proxy.getStats(); - Assert.assertTrue(stats.length == 1); - Assert.assertEquals(2, stats[0]); - - // Counter updates only once now - Assert.assertEquals(8, counter.get()); - - // Flip to Other standby.. so now this should fail - isGood[0] = 3; - try { - provider.getProxy().proxy.getStats(); - Assert.fail("Should fail since previously successful proxy now fails "); - } catch (Exception ex) { - Assert.assertTrue(ex instanceof IOException); - } - - // Counter should ipdate only 1 time - Assert.assertEquals(9, counter.get()); - - provider.performFailover(provider.getProxy().proxy); - stats = provider.getProxy().proxy.getStats(); - Assert.assertTrue(stats.length == 1); - - // Ensure correct proxy was called - Assert.assertEquals(3, stats[0]); - - // Counter updates twice since both proxies are tried on failure - Assert.assertEquals(11, counter.get()); - - stats = provider.getProxy().proxy.getStats(); - Assert.assertTrue(stats.length == 1); - Assert.assertEquals(3, stats[0]); - - // Counter updates only once now - Assert.assertEquals(12, counter.get()); - } - - @Test - public void testHedgingWhenFileNotFoundException() throws Exception { - NamenodeProtocols active = Mockito.mock(NamenodeProtocols.class); - Mockito - .when(active.getBlockLocations(Matchers.anyString(), - Matchers.anyLong(), Matchers.anyLong())) - .thenThrow(new RemoteException("java.io.FileNotFoundException", - "File does not exist!")); - - NamenodeProtocols standby = Mockito.mock(NamenodeProtocols.class); - Mockito - .when(standby.getBlockLocations(Matchers.anyString(), - Matchers.anyLong(), Matchers.anyLong())) - .thenThrow( - new RemoteException("org.apache.hadoop.ipc.StandbyException", - "Standby NameNode")); - - RequestHedgingProxyProvider<NamenodeProtocols> provider = - new RequestHedgingProxyProvider<>(conf, nnUri, - NamenodeProtocols.class, createFactory(active, standby)); - try { - provider.getProxy().proxy.getBlockLocations("/tmp/test.file", 0L, 20L); - Assert.fail("Should fail since the active namenode throws" - + " FileNotFoundException!"); - } catch (MultiException me) { - for (Exception ex : me.getExceptions().values()) { - Exception rEx = ((RemoteException) ex).unwrapRemoteException(); - if (rEx instanceof StandbyException) { - continue; - } - Assert.assertTrue(rEx instanceof FileNotFoundException); - } - } - Mockito.verify(active).getBlockLocations(Matchers.anyString(), - Matchers.anyLong(), Matchers.anyLong()); - Mockito.verify(standby).getBlockLocations(Matchers.anyString(), - Matchers.anyLong(), Matchers.anyLong()); - } - - @Test - public void testHedgingWhenConnectException() throws Exception { - NamenodeProtocols active = Mockito.mock(NamenodeProtocols.class); - Mockito.when(active.getStats()).thenThrow(new ConnectException()); - - NamenodeProtocols standby = Mockito.mock(NamenodeProtocols.class); - Mockito.when(standby.getStats()) - .thenThrow( - new RemoteException("org.apache.hadoop.ipc.StandbyException", - "Standby NameNode")); - - RequestHedgingProxyProvider<NamenodeProtocols> provider = - new RequestHedgingProxyProvider<>(conf, nnUri, - NamenodeProtocols.class, createFactory(active, standby)); - try { - provider.getProxy().proxy.getStats(); - Assert.fail("Should fail since the active namenode throws" - + " ConnectException!"); - } catch (MultiException me) { - for (Exception ex : me.getExceptions().values()) { - if (ex instanceof RemoteException) { - Exception rEx = ((RemoteException) ex) - .unwrapRemoteException(); - Assert.assertTrue("Unexpected RemoteException: " + rEx.getMessage(), - rEx instanceof StandbyException); - } else { - Assert.assertTrue(ex instanceof ConnectException); - } - } - } - Mockito.verify(active).getStats(); - Mockito.verify(standby).getStats(); - } - - @Test - public void testHedgingWhenConnectAndEOFException() throws Exception { - NamenodeProtocols active = Mockito.mock(NamenodeProtocols.class); - Mockito.when(active.getStats()).thenThrow(new EOFException()); - - NamenodeProtocols standby = Mockito.mock(NamenodeProtocols.class); - Mockito.when(standby.getStats()).thenThrow(new ConnectException()); - - RequestHedgingProxyProvider<NamenodeProtocols> provider = - new RequestHedgingProxyProvider<>(conf, nnUri, - NamenodeProtocols.class, createFactory(active, standby)); - try { - provider.getProxy().proxy.getStats(); - Assert.fail("Should fail since both active and standby namenodes throw" - + " Exceptions!"); - } catch (MultiException me) { - for (Exception ex : me.getExceptions().values()) { - if (!(ex instanceof ConnectException) && - !(ex instanceof EOFException)) { - Assert.fail("Unexpected Exception " + ex.getMessage()); - } - } - } - Mockito.verify(active).getStats(); - Mockito.verify(standby).getStats(); - } - - private ProxyFactory<NamenodeProtocols> createFactory( - NamenodeProtocols... protos) { - final Iterator<NamenodeProtocols> iterator = - Lists.newArrayList(protos).iterator(); - return new ProxyFactory<NamenodeProtocols>() { - @Override - public NamenodeProtocols createProxy(Configuration conf, - InetSocketAddress nnAddr, Class<NamenodeProtocols> xface, - UserGroupInformation ugi, boolean withRetries, - AtomicBoolean fallbackToSimpleAuth) throws IOException { - return iterator.next(); - } - }; - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
