This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new cab7086 HDFS-16369. RBF: Fix the retry logic of
RouterRpcServer#invokeAtAvailableNs. (#3745). Contributed by Ayush Saxena.
cab7086 is described below
commit cab7086fbcbb82fe8deac9fdbb1c852c7c47733f
Author: Ayush Saxena <[email protected]>
AuthorDate: Sat Dec 4 10:54:29 2021 +0530
HDFS-16369. RBF: Fix the retry logic of
RouterRpcServer#invokeAtAvailableNs. (#3745). Contributed by Ayush Saxena.
Reviewed-by: litao <[email protected]>
Reviewed-by: Inigo Goiri <[email protected]>
---
.../server/federation/router/RouterRpcServer.java | 68 ++++++++++------------
...erRPCMultipleDestinationMountTableResolver.java | 14 ++++-
2 files changed, 43 insertions(+), 39 deletions(-)
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 48d4ff0..2b6c4a1 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -43,7 +43,6 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
@@ -671,8 +670,8 @@ public class RouterRpcServer extends AbstractService
implements ClientProtocol,
/**
* Invokes the method at default namespace, if default namespace is not
- * available then at the first available namespace.
- * If the namespace is unavailable, retry once with other namespace.
+ * available then at the other available namespaces.
+ * If the namespace is unavailable, retry with other namespaces.
* @param <T> expected return type.
* @param method the remote method.
* @return the response received after invoking method.
@@ -681,28 +680,29 @@ public class RouterRpcServer extends AbstractService
implements ClientProtocol,
<T> T invokeAtAvailableNs(RemoteMethod method, Class<T> clazz)
throws IOException {
String nsId = subclusterResolver.getDefaultNamespace();
- // If default Ns is not present return result from first namespace.
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
- try {
- if (!nsId.isEmpty()) {
+ // If no namespace is available, then throw this IOException.
+ IOException io = new IOException("No namespace available.");
+ // If default Ns is present return result from that namespace.
+ if (!nsId.isEmpty()) {
+ try {
return rpcClient.invokeSingle(nsId, method, clazz);
+ } catch (IOException ioe) {
+ if (!clientProto.isUnavailableSubclusterException(ioe)) {
+ LOG.debug("{} exception cannot be retried",
+ ioe.getClass().getSimpleName());
+ throw ioe;
+ }
+ // Remove the already tried namespace.
+ nss.removeIf(n -> n.getNameserviceId().equals(nsId));
+ return invokeOnNs(method, clazz, io, nss);
}
- // If no namespace is available, throw IOException.
- IOException io = new IOException("No namespace available.");
- return invokeOnNs(method, clazz, io, nss);
- } catch (IOException ioe) {
- if (!clientProto.isUnavailableSubclusterException(ioe)) {
- LOG.debug("{} exception cannot be retried",
- ioe.getClass().getSimpleName());
- throw ioe;
- }
- Set<FederationNamespaceInfo> nssWithoutFailed = getNameSpaceInfo(nss,
nsId);
- return invokeOnNs(method, clazz, ioe, nssWithoutFailed);
}
+ return invokeOnNs(method, clazz, io, nss);
}
/**
- * Invoke the method on first available namespace,
+ * Invoke the method sequentially on available namespaces,
* throw no namespace available exception, if no namespaces are available.
* @param method the remote method.
* @param clazz Class for the return type.
@@ -716,26 +716,22 @@ public class RouterRpcServer extends AbstractService
implements ClientProtocol,
if (nss.isEmpty()) {
throw ioe;
}
- String nsId = nss.iterator().next().getNameserviceId();
- return rpcClient.invokeSingle(nsId, method, clazz);
- }
-
- /**
- * Get set of namespace info's removing the already invoked namespaceinfo.
- * @param nss List of namespaces in the federation.
- * @param nsId Already invoked namespace id.
- * @return List of name spaces in the federation on
- * removing the already invoked namespaceinfo.
- */
- private static Set<FederationNamespaceInfo> getNameSpaceInfo(
- final Set<FederationNamespaceInfo> nss, final String nsId) {
- Set<FederationNamespaceInfo> namespaceInfos = new HashSet<>();
- for (FederationNamespaceInfo ns : nss) {
- if (!nsId.equals(ns.getNameserviceId())) {
- namespaceInfos.add(ns);
+ for (FederationNamespaceInfo fnInfo : nss) {
+ String nsId = fnInfo.getNameserviceId();
+ LOG.debug("Invoking {} on namespace {}", method, nsId);
+ try {
+ return rpcClient.invokeSingle(nsId, method, clazz);
+ } catch (IOException e) {
+ LOG.debug("Failed to invoke {} on namespace {}", method, nsId, e);
+ // Ignore the exception and try on other namespace, if the tried
+ // namespace is unavailable, else throw the received exception.
+ if (!clientProto.isUnavailableSubclusterException(e)) {
+ throw e;
+ }
}
}
- return namespaceInfos;
+ // Couldn't get a response from any of the namespace, throw ioe.
+ throw ioe;
}
@Override // ClientProtocol
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java
index ee92ec4..aa29e8d 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java
@@ -76,13 +76,14 @@ import org.junit.Test;
* Tests router rpc with multiple destination mount table resolver.
*/
public class TestRouterRPCMultipleDestinationMountTableResolver {
- private static final List<String> NS_IDS = Arrays.asList("ns0", "ns1");
+ private static final List<String> NS_IDS = Arrays.asList("ns0", "ns1",
"ns2");
private static StateStoreDFSCluster cluster;
private static RouterContext routerContext;
private static MountTableResolver resolver;
private static DistributedFileSystem nnFs0;
private static DistributedFileSystem nnFs1;
+ private static DistributedFileSystem nnFs2;
private static DistributedFileSystem routerFs;
private static RouterRpcServer rpcServer;
@@ -90,7 +91,7 @@ public class
TestRouterRPCMultipleDestinationMountTableResolver {
public static void setUp() throws Exception {
// Build and start a federated cluster
- cluster = new StateStoreDFSCluster(false, 2,
+ cluster = new StateStoreDFSCluster(false, 3,
MultipleDestinationMountTableResolver.class);
Configuration routerConf =
new RouterConfigBuilder().stateStore().admin().quota().rpc().build();
@@ -111,6 +112,8 @@ public class
TestRouterRPCMultipleDestinationMountTableResolver {
.getNamenode(cluster.getNameservices().get(0), null).getFileSystem();
nnFs1 = (DistributedFileSystem) cluster
.getNamenode(cluster.getNameservices().get(1), null).getFileSystem();
+ nnFs2 = (DistributedFileSystem) cluster
+ .getNamenode(cluster.getNameservices().get(2), null).getFileSystem();
routerFs = (DistributedFileSystem) routerContext.getFileSystem();
rpcServer =routerContext.getRouter().getRpcServer();
}
@@ -668,6 +671,7 @@ public class
TestRouterRPCMultipleDestinationMountTableResolver {
// Make one subcluster unavailable.
MiniDFSCluster dfsCluster = cluster.getCluster();
dfsCluster.shutdownNameNode(0);
+ dfsCluster.shutdownNameNode(1);
try {
// Verify that #invokeAtAvailableNs works by calling #getServerDefaults.
RemoteMethod method = new RemoteMethod("getServerDefaults");
@@ -675,7 +679,8 @@ public class
TestRouterRPCMultipleDestinationMountTableResolver {
rpcServer.invokeAtAvailableNs(method, FsServerDefaults.class);
assertNotNull(serverDefaults);
} finally {
- dfsCluster.restartNameNode(0);
+ dfsCluster.restartNameNode(0, false);
+ dfsCluster.restartNameNode(1);
}
}
@@ -893,6 +898,9 @@ public class
TestRouterRPCMultipleDestinationMountTableResolver {
if (nsId.equals("ns1")) {
return nnFs1;
}
+ if (nsId.equals("ns2")) {
+ return nnFs2;
+ }
return null;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]