HDFS-12813. RequestHedgingProxyProvider can hide Exception thrown from the Namenode for proxy size of 1. Contributed by Mukul Kumar Singh
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/659e85e3 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/659e85e3 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/659e85e3 Branch: refs/heads/YARN-5881 Commit: 659e85e304d070f9908a96cf6a0e1cbafde6a434 Parents: 60fc2a1 Author: Tsz-Wo Nicholas Sze <szets...@hortonworks.com> Authored: Mon Nov 20 17:09:19 2017 -0800 Committer: Tsz-Wo Nicholas Sze <szets...@hortonworks.com> Committed: Mon Nov 20 17:09:19 2017 -0800 ---------------------------------------------------------------------- .../ha/RequestHedgingProxyProvider.java | 81 ++++++++++++++------ .../ha/TestRequestHedgingProxyProvider.java | 58 ++++++++++++++ 2 files changed, 114 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/659e85e3/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java index b94e94d..08edfe2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.namenode.ha; import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.URI; @@ -29,6 +30,7 @@ import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ExecutionException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RemoteException; @@ -87,9 +89,19 @@ public class RequestHedgingProxyProvider<T> extends targetProxies.remove(toIgnore); if (targetProxies.size() == 1) { ProxyInfo<T> proxyInfo = targetProxies.values().iterator().next(); - Object retVal = method.invoke(proxyInfo.proxy, args); - successfulProxy = proxyInfo; - return retVal; + try { + currentUsedProxy = proxyInfo; + Object retVal = method.invoke(proxyInfo.proxy, args); + LOG.debug("Invocation successful on [{}]", + currentUsedProxy.proxyInfo); + return retVal; + } catch (InvocationTargetException ex) { + Exception unwrappedException = unwrapInvocationTargetException(ex); + logProxyException(unwrappedException, currentUsedProxy.proxyInfo); + LOG.trace("Unsuccessful invocation on [{}]", + currentUsedProxy.proxyInfo); + throw unwrappedException; + } } executor = Executors.newFixedThreadPool(proxies.size()); completionService = new ExecutorCompletionService<>(executor); @@ -112,15 +124,16 @@ public class RequestHedgingProxyProvider<T> extends Future<Object> callResultFuture = completionService.take(); Object retVal; try { + currentUsedProxy = proxyMap.get(callResultFuture); retVal = callResultFuture.get(); - successfulProxy = proxyMap.get(callResultFuture); LOG.debug("Invocation successful on [{}]", - successfulProxy.proxyInfo); + currentUsedProxy.proxyInfo); return retVal; - } catch (Exception ex) { + } catch (ExecutionException ex) { + Exception unwrappedException = unwrapExecutionException(ex); ProxyInfo<T> tProxyInfo = proxyMap.get(callResultFuture); - logProxyException(ex, tProxyInfo.proxyInfo); - badResults.put(tProxyInfo.proxyInfo, unwrapException(ex)); + logProxyException(unwrappedException, tProxyInfo.proxyInfo); + badResults.put(tProxyInfo.proxyInfo, unwrappedException); LOG.trace("Unsuccessful invocation on [{}]", tProxyInfo.proxyInfo); numAttempts--; } @@ -143,7 +156,7 @@ public class RequestHedgingProxyProvider<T> extends } - private volatile ProxyInfo<T> successfulProxy = null; + private volatile ProxyInfo<T> currentUsedProxy = null; private volatile String toIgnore = null; public RequestHedgingProxyProvider(Configuration conf, URI uri, @@ -154,8 +167,8 @@ public class RequestHedgingProxyProvider<T> extends @SuppressWarnings("unchecked") @Override public synchronized ProxyInfo<T> getProxy() { - if (successfulProxy != null) { - return successfulProxy; + if (currentUsedProxy != null) { + return currentUsedProxy; } Map<String, ProxyInfo<T>> targetProxyInfos = new HashMap<>(); StringBuilder combinedInfo = new StringBuilder("["); @@ -175,8 +188,8 @@ public class RequestHedgingProxyProvider<T> extends @Override public synchronized void performFailover(T currentProxy) { - toIgnore = successfulProxy.proxyInfo; - successfulProxy = null; + toIgnore = this.currentUsedProxy.proxyInfo; + this.currentUsedProxy = null; } /** @@ -187,19 +200,18 @@ public class RequestHedgingProxyProvider<T> extends */ private void logProxyException(Exception ex, String proxyInfo) { if (isStandbyException(ex)) { - LOG.debug("Invocation returned standby exception on [{}]", proxyInfo); + LOG.debug("Invocation returned standby exception on [{}]", proxyInfo, ex); } else { - LOG.warn("Invocation returned exception on [{}]", proxyInfo); + LOG.warn("Invocation returned exception on [{}]", proxyInfo, ex); } } /** * Check if the returned exception is caused by an standby namenode. - * @param ex Exception to check. + * @param exception Exception to check. * @return If the exception is caused by an standby namenode. */ - private boolean isStandbyException(Exception ex) { - Exception exception = unwrapException(ex); + private boolean isStandbyException(Exception exception) { if (exception instanceof RemoteException) { return ((RemoteException) exception).unwrapRemoteException() instanceof StandbyException; @@ -208,24 +220,43 @@ public class RequestHedgingProxyProvider<T> extends } /** - * Unwraps the exception. <p> + * Unwraps the ExecutionException. <p> * Example: * <blockquote><pre> * if ex is - * ExecutionException(InvocationTargetExeption(SomeException)) + * ExecutionException(InvocationTargetException(SomeException)) * returns SomeException * </pre></blockquote> * * @return unwrapped exception */ - private Exception unwrapException(Exception ex) { + private Exception unwrapExecutionException(ExecutionException ex) { + if (ex != null) { + Throwable cause = ex.getCause(); + if (cause instanceof InvocationTargetException) { + return + unwrapInvocationTargetException((InvocationTargetException)cause); + } + } + return ex; + + } + + /** + * Unwraps the InvocationTargetException. <p> + * Example: + * <blockquote><pre> + * if ex is InvocationTargetException(SomeException) + * returns SomeException + * </pre></blockquote> + * + * @return unwrapped exception + */ + private Exception unwrapInvocationTargetException( + InvocationTargetException ex) { if (ex != null) { Throwable cause = ex.getCause(); if (cause instanceof Exception) { - Throwable innerCause = cause.getCause(); - if (innerCause instanceof Exception) { - return (Exception) innerCause; - } return (Exception) cause; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/659e85e3/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java index 724b5f0..8feba96 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java @@ -232,6 +232,64 @@ public class TestRequestHedgingProxyProvider { } @Test + public void testFileNotFoundExceptionWithSingleProxy() throws Exception { + ClientProtocol active = Mockito.mock(ClientProtocol.class); + Mockito + .when(active.getBlockLocations(Matchers.anyString(), + Matchers.anyLong(), Matchers.anyLong())) + .thenThrow(new RemoteException("java.io.FileNotFoundException", + "File does not exist!")); + + ClientProtocol standby = Mockito.mock(ClientProtocol.class); + Mockito + .when(standby.getBlockLocations(Matchers.anyString(), + Matchers.anyLong(), Matchers.anyLong())) + .thenThrow( + new RemoteException("org.apache.hadoop.ipc.StandbyException", + "Standby NameNode")); + + RequestHedgingProxyProvider<ClientProtocol> provider = + new RequestHedgingProxyProvider<>(conf, nnUri, + ClientProtocol.class, createFactory(standby, active)); + try { + provider.getProxy().proxy.getBlockLocations("/tmp/test.file", 0L, 20L); + Assert.fail("Should fail since the active namenode throws" + + " FileNotFoundException!"); + } catch (MultiException me) { + for (Exception ex : me.getExceptions().values()) { + Exception rEx = ((RemoteException) ex).unwrapRemoteException(); + if (rEx instanceof StandbyException) { + continue; + } + Assert.assertTrue(rEx instanceof FileNotFoundException); + } + } + //Perform failover now, there will only be one active proxy now + provider.performFailover(active); + try { + provider.getProxy().proxy.getBlockLocations("/tmp/test.file", 0L, 20L); + Assert.fail("Should fail since the active namenode throws" + + " FileNotFoundException!"); + } catch (RemoteException ex) { + Exception rEx = ex.unwrapRemoteException(); + if (rEx instanceof StandbyException) { + Mockito.verify(active).getBlockLocations(Matchers.anyString(), + Matchers.anyLong(), Matchers.anyLong()); + Mockito.verify(standby, Mockito.times(2)) + .getBlockLocations(Matchers.anyString(), + Matchers.anyLong(), Matchers.anyLong()); + } else { + Assert.assertTrue(rEx instanceof FileNotFoundException); + Mockito.verify(active, Mockito.times(2)) + .getBlockLocations(Matchers.anyString(), + Matchers.anyLong(), Matchers.anyLong()); + Mockito.verify(standby).getBlockLocations(Matchers.anyString(), + Matchers.anyLong(), Matchers.anyLong()); + } + } + } + + @Test public void testPerformFailoverWith3Proxies() throws Exception { conf.set(HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, "nn1,nn2,nn3"); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org