KeeProMise commented on code in PR #7304:
URL: https://github.com/apache/hadoop/pull/7304#discussion_r1998034741
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java:
##########
@@ -420,7 +430,6 @@ public <R extends RemoteLocationContext, T> RemoteResult
invokeSequential(
return ret;
}, Exception.class);
asyncFinally(ret -> {
- releasePermit(ns, ugi, remoteMethod, controller);
return ret;
Review Comment:
No need for asyncFinally here
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java:
##########
@@ -568,7 +642,6 @@ public <T extends RemoteLocationContext, R>
List<RemoteResult<T, R>> invokeSingl
throw processException(ioe, location);
}, IOException.class);
asyncFinally(o -> {
- releasePermit(ns, ugi, method, controller);
return o;
});
Review Comment:
No need for asyncFinally here
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java:
##########
@@ -589,8 +662,6 @@ public <T extends RemoteLocationContext, R>
List<RemoteResult<T, R>> invokeSingl
public Object invokeSingle(final String nsId, RemoteMethod method)
throws IOException {
UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
- RouterRpcFairnessPolicyController controller =
getRouterRpcFairnessPolicyController();
- acquirePermit(nsId, ugi, method, controller);
asyncTry(() -> {
boolean isObserverRead = isObserverReadEligible(nsId,
method.getMethod());
Review Comment:
```java
public Object invokeSingle(final String nsId, RemoteMethod method)
throws IOException {
UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
boolean isObserverRead = isObserverReadEligible(nsId,
method.getMethod());
List<? extends FederationNamenodeContext> nns =
getOrderedNamenodes(nsId, isObserverRead);
RemoteLocationContext loc = new RemoteLocation(nsId, "/", "/");
Class<?> proto = method.getProtocol();
Method m = method.getMethod();
Object[] params = method.getParams(loc);
invokeMethod(ugi, nns, isObserverRead, proto, m, params);
return null;
}
```
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java:
##########
@@ -627,4 +697,46 @@ public <T> T invokeSingle(
invokeSequential(locations, remoteMethod);
return asyncReturn(clazz);
}
+
+ protected void acquirePermit(final String nsId, final UserGroupInformation
ugi,
+ final Method m, RouterRpcFairnessPolicyController controller)
+ throws IOException {
+ if (controller != null) {
+ if (!controller.acquirePermit(nsId)) {
+ // Throw StandByException,
+ // Clients could fail over and try another router.
+ if (rpcMonitor != null) {
+ rpcMonitor.proxyOpPermitRejected(nsId);
+ }
+ incrRejectedPermitForNs(nsId);
+ LOG.debug("Permit denied for ugi: {} for method: {}",
+ ugi, m.getName());
+ String msg =
+ "Router " + router.getRouterId() +
+ " is overloaded for NS: " + nsId;
+ throw new StandbyException(msg);
+ }
+ if (rpcMonitor != null) {
+ rpcMonitor.proxyOpPermitAccepted(nsId);
+ }
+ incrAcceptedPermitForNs(nsId);
+ }
+ }
Review Comment:
Here you can add a method acquirePermit(final String nsId, final
UserGroupInformation ugi,
final String methodName, RouterRpcFairnessPolicyController controller) in
RouterRpcClient, and then RouterRpcClient#acquirePermit(final String nsId,
final UserGroupInformation ugi,
final RemoteMethod m, RouterRpcFairnessPolicyController controller) and
RouterAsyncRpcClient#acquirePermit(final String nsId, final
UserGroupInformation ugi,
final Method m, RouterRpcFairnessPolicyController controller) both call the
public method.
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java:
##########
@@ -178,7 +179,7 @@ public Object invokeMethod(
namenodes.toString(), params);
}
threadLocalContext.transfer();
- invokeMethodAsync(ugi, (List<FederationNamenodeContext>) namenodes,
+ invokeMethodAsync(nsid, ugi, (List<FederationNamenodeContext>) namenodes,
useObserver, protocol, method, params);
Review Comment:
```java
RouterRpcFairnessPolicyController controller =
getRouterRpcFairnessPolicyController();
acquirePermit(nsid, ugi, method, controller);
invokeMethodAsync(ugi, (List<FederationNamenodeContext>) namenodes,
useObserver, protocol, method, params);
asyncFinally(object -> {
releasePermit(nsid, ugi, method, controller);
return object;
});
```
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java:
##########
@@ -202,58 +203,69 @@ public Object invokeMethod(
* @param params The parameters for the method invocation.
*/
private void invokeMethodAsync(
+ String nsid,
final UserGroupInformation ugi,
final List<FederationNamenodeContext> namenodes,
Review Comment:
Do not modify this method, the logic will become complicated and the
readability will be poor
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java:
##########
@@ -479,6 +488,76 @@ public <T extends RemoteLocationContext, R> Map<T, R>
invokeConcurrent(
return asyncReturn(Map.class);
}
+ @SuppressWarnings("unchecked")
+ public <T extends RemoteLocationContext, R> List<RemoteResult<T, R>>
invokeConcurrent(
+ final Collection<T> locations, final RemoteMethod method,
+ boolean standby, long timeOutMs,
+ Class<R> clazz) throws IOException {
+
+ final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
+ final Method m = method.getMethod();
+
+ if (locations.isEmpty()) {
+ throw new IOException("No remote locations available");
+ } else if (locations.size() == 1 && timeOutMs <= 0) {
+ // Shortcut, just one call
+ return invokeSingle(locations.iterator().next(), method);
+ }
+ // Don't acquire CONCURRENT_NS permit here.
+ RouterRpcFairnessPolicyController controller =
getRouterRpcFairnessPolicyController();
+
Review Comment:
I understand that you don't want to acquirePermit(CONCURRENT_NS, ugi,
method, controller) here. You can do special processing for the case where
ns=CONCURRENT_NS in RouterAsyncRpcClient#acquirePermit or
RouterAsyncRpcFairnessPolicyController without duplicating this method again.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]