This is an automated email from the ASF dual-hosted git repository. inigoiri pushed a commit to branch HDFS-13891 in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 6e770ff428666f5bcd7dd25f2672558bf6b65426 Author: Inigo Goiri <[email protected]> AuthorDate: Wed Jan 2 10:49:00 2019 -0800 HDFS-14161. RBF: Throw StandbyException instead of IOException so that client can retry when can not get connection. Contributed by Fei Hui. --- .../federation/router/ConnectionNullException.java | 33 ++++++++++++++++++ .../server/federation/router/RouterRpcClient.java | 20 ++++++++--- .../server/federation/FederationTestUtils.java | 31 +++++++++++++++++ .../router/TestRouterClientRejectOverload.java | 40 ++++++++++++++++++++++ 4 files changed, 120 insertions(+), 4 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionNullException.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionNullException.java new file mode 100644 index 0000000..53de602 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionNullException.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.io.IOException; + + +/** + * Exception when can not get a non-null connection. + */ +public class ConnectionNullException extends IOException { + + private static final long serialVersionUID = 1L; + + public ConnectionNullException(String msg) { + super(msg); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index a21e980..c4d3a20 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -270,7 +270,8 @@ public class RouterRpcClient { } if (connection == null) { - throw new IOException("Cannot get a connection to " + rpcAddress); + throw new ConnectionNullException("Cannot get a connection to " + + rpcAddress); } return connection; } @@ -363,9 +364,9 @@ public class RouterRpcClient { Map<FederationNamenodeContext, IOException> ioes = new LinkedHashMap<>(); for (FederationNamenodeContext namenode : namenodes) { ConnectionContext connection = null; + String nsId = namenode.getNameserviceId(); + String rpcAddress = namenode.getRpcAddress(); try { - String nsId = namenode.getNameserviceId(); - String rpcAddress = namenode.getRpcAddress(); connection = this.getConnection(ugi, nsId, rpcAddress, protocol); ProxyAndInfo<?> client = connection.getClient(); final Object proxy = client.getProxy(); @@ -394,6 +395,16 @@ public class RouterRpcClient { } // RemoteException returned by NN throw (RemoteException) ioe; + } else if (ioe instanceof ConnectionNullException) { + if (this.rpcMonitor != null) { + this.rpcMonitor.proxyOpFailureCommunicate(); + } + LOG.error("Get connection for {} {} error: {}", nsId, rpcAddress, + ioe.getMessage()); + // Throw StandbyException so that client can retry + StandbyException se = new StandbyException(ioe.getMessage()); + se.initCause(ioe); + throw se; } else { // Other communication error, this is a failure // Communication retries are handled by the retry policy @@ -425,7 +436,8 @@ public class RouterRpcClient { String addr = namenode.getRpcAddress(); IOException ioe = entry.getValue(); if (ioe instanceof StandbyException) { - LOG.error("{} {} at {} is in Standby", nsId, nnId, addr); + LOG.error("{} {} at {} is in Standby: {}", nsId, nnId, addr, + ioe.getMessage()); } else { LOG.error("{} {} at {} error: \"{}\"", nsId, nnId, addr, ioe.getMessage()); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java index 5095c6b..d92edac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java @@ -52,6 +52,9 @@ import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; +import org.apache.hadoop.hdfs.server.federation.router.ConnectionManager; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; @@ -60,6 +63,7 @@ import org.apache.hadoop.hdfs.server.federation.store.RouterStore; import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.Whitebox; import org.mockito.invocation.InvocationOnMock; @@ -343,4 +347,31 @@ public final class FederationTestUtils { } }, 100, timeout); } + + /** + * Simulate that a RouterRpcServer, the ConnectionManager of its + * RouterRpcClient throws IOException when call getConnection. So the + * RouterRpcClient will get a null Connection. + * @param server RouterRpcServer + * @throws IOException + */ + public static void simulateThrowExceptionRouterRpcServer( + final RouterRpcServer server) throws IOException { + RouterRpcClient rpcClient = server.getRPCClient(); + ConnectionManager connectionManager = + new ConnectionManager(server.getConfig()); + ConnectionManager spyConnectionManager = spy(connectionManager); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + LOG.info("Simulating connectionManager throw IOException {}", + invocation.getMock()); + throw new IOException("Simulate connectionManager throw IOException"); + } + }).when(spyConnectionManager).getConnection( + any(UserGroupInformation.class), any(String.class), any(Class.class)); + + Whitebox.setInternalState(rpcClient, "connectionManager", + spyConnectionManager); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java index 3c51e13..0664159 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.federation.router; import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateSlowNamenode; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateThrowExceptionRouterRpcServer; import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -240,4 +241,43 @@ public class TestRouterClientRejectOverload { num <= expOverloadMax); } } + + @Test + public void testConnectionNullException() throws Exception { + setupCluster(false); + + // Choose 1st router + RouterContext routerContext = cluster.getRouters().get(0); + Router router = routerContext.getRouter(); + // This router will throw ConnectionNullException + simulateThrowExceptionRouterRpcServer(router.getRpcServer()); + + // Set dfs.client.failover.random.order false, to pick 1st router at first + Configuration conf = cluster.getRouterClientConf(); + conf.setBoolean("dfs.client.failover.random.order", false); + // Client to access Router Cluster + DFSClient routerClient = + new DFSClient(new URI("hdfs://fed"), conf); + + // Get router0 metrics + FederationRPCMetrics rpcMetrics0 = cluster.getRouters().get(0) + .getRouter().getRpcServer().getRPCMetrics(); + // Get router1 metrics + FederationRPCMetrics rpcMetrics1 = cluster.getRouters().get(1) + .getRouter().getRpcServer().getRPCMetrics(); + + // Original failures + long originalRouter0Failures = rpcMetrics0.getProxyOpFailureCommunicate(); + long originalRouter1Failures = rpcMetrics1.getProxyOpFailureCommunicate(); + + // RPC call must be successful + routerClient.getFileInfo("/"); + + // Router 0 failures will increase + assertEquals(originalRouter0Failures + 1, + rpcMetrics0.getProxyOpFailureCommunicate()); + // Router 1 failures will not change + assertEquals(originalRouter1Failures, + rpcMetrics1.getProxyOpFailureCommunicate()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
