goiri commented on code in PR #4897:
URL: https://github.com/apache/hadoop/pull/4897#discussion_r974657324
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java:
##########
@@ -463,42 +461,67 @@ public void recover(Map<String, byte[]> recoveredDataMap)
{
// map as well.
UserGroupInformation appSubmitter;
if (UserGroupInformation.isSecurityEnabled()) {
- appSubmitter =
UserGroupInformation.createProxyUser(getApplicationContext().getUser(),
+ appSubmitter = UserGroupInformation.createProxyUser(user,
UserGroupInformation.getLoginUser());
} else {
- appSubmitter =
UserGroupInformation.createRemoteUser(getApplicationContext().getUser());
+ appSubmitter = UserGroupInformation.createRemoteUser(user);
}
- ApplicationClientProtocol rmClient =
- createHomeRMProxy(getApplicationContext(),
- ApplicationClientProtocol.class, appSubmitter);
- GetContainersResponse response = rmClient
- .getContainers(GetContainersRequest.newInstance(this.attemptId));
+ ApplicationClientProtocol rmClient =
createHomeRMProxy(applicationContext,
+ ApplicationClientProtocol.class, appSubmitter);
+
+ GetContainersRequest request =
GetContainersRequest.newInstance(this.attemptId);
+ GetContainersResponse response = rmClient.getContainers(request);
+
for (ContainerReport container : response.getContainerList()) {
- containerIdToSubClusterIdMap.put(container.getContainerId(),
- this.homeSubClusterId);
+ ContainerId containerId = container.getContainerId();
+ containerIdToSubClusterIdMap.put(containerId, this.homeSubClusterId);
containers++;
- LOG.debug(" From home RM {} running container {}",
- this.homeSubClusterId, container.getContainerId());
+ LOG.debug("From home RM {} running container {}.",
this.homeSubClusterId, containerId);
}
- LOG.info("{} running containers including AM recovered from home RM {}",
+ LOG.info("{} running containers including AM recovered from home RM {}.",
response.getContainerList().size(), this.homeSubClusterId);
- LOG.info(
- "In all {} UAMs {} running containers including AM recovered for {}",
+ LOG.info("In all {} UAMs {} running containers including AM recovered
for {}.",
uamMap.size(), containers, this.attemptId);
- if (this.amRegistrationResponse != null) {
+ if (queue != null) {
// Initialize the AMRMProxyPolicy
- String queue = this.amRegistrationResponse.getQueue();
- this.policyInterpreter =
- FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter,
- getConf(), this.federationFacade, this.homeSubClusterId);
+ queue = this.amRegistrationResponse.getQueue();
+ this.policyInterpreter = FederationPolicyUtils.loadAMRMPolicy(queue,
this.policyInterpreter,
+ getConf(), this.federationFacade, this.homeSubClusterId);
}
} catch (IOException | YarnException e) {
throw new YarnRuntimeException(e);
}
+ }
+ private Map<String, Token<AMRMTokenIdentifier>> getSCAMRMTokenIdentifierMap(
+ Map<String, byte[]> recoveredDataMap) throws IOException {
+ Map<String, Token<AMRMTokenIdentifier>> uamMap = new HashMap<>();
+ ApplicationId applicationId = this.attemptId.getApplicationId();
+ if (this.registryClient != null) {
+ uamMap = this.registryClient.loadStateFromRegistry(applicationId);
+ LOG.info("Found {} existing UAMs for application {} in Yarn Registry.",
+ uamMap.size(), applicationId);
+ } else {
+ for (Entry<String, byte[]> entry : recoveredDataMap.entrySet()) {
Review Comment:
It might be good to have this large chunk in a separate method.
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java:
##########
@@ -463,42 +461,67 @@ public void recover(Map<String, byte[]> recoveredDataMap)
{
// map as well.
UserGroupInformation appSubmitter;
if (UserGroupInformation.isSecurityEnabled()) {
- appSubmitter =
UserGroupInformation.createProxyUser(getApplicationContext().getUser(),
+ appSubmitter = UserGroupInformation.createProxyUser(user,
UserGroupInformation.getLoginUser());
} else {
- appSubmitter =
UserGroupInformation.createRemoteUser(getApplicationContext().getUser());
+ appSubmitter = UserGroupInformation.createRemoteUser(user);
}
- ApplicationClientProtocol rmClient =
- createHomeRMProxy(getApplicationContext(),
- ApplicationClientProtocol.class, appSubmitter);
- GetContainersResponse response = rmClient
- .getContainers(GetContainersRequest.newInstance(this.attemptId));
+ ApplicationClientProtocol rmClient =
createHomeRMProxy(applicationContext,
+ ApplicationClientProtocol.class, appSubmitter);
+
+ GetContainersRequest request =
GetContainersRequest.newInstance(this.attemptId);
+ GetContainersResponse response = rmClient.getContainers(request);
+
for (ContainerReport container : response.getContainerList()) {
- containerIdToSubClusterIdMap.put(container.getContainerId(),
- this.homeSubClusterId);
+ ContainerId containerId = container.getContainerId();
+ containerIdToSubClusterIdMap.put(containerId, this.homeSubClusterId);
containers++;
- LOG.debug(" From home RM {} running container {}",
- this.homeSubClusterId, container.getContainerId());
+ LOG.debug("From home RM {} running container {}.",
this.homeSubClusterId, containerId);
}
- LOG.info("{} running containers including AM recovered from home RM {}",
+ LOG.info("{} running containers including AM recovered from home RM {}.",
response.getContainerList().size(), this.homeSubClusterId);
- LOG.info(
- "In all {} UAMs {} running containers including AM recovered for {}",
+ LOG.info("In all {} UAMs {} running containers including AM recovered
for {}.",
uamMap.size(), containers, this.attemptId);
- if (this.amRegistrationResponse != null) {
+ if (queue != null) {
// Initialize the AMRMProxyPolicy
- String queue = this.amRegistrationResponse.getQueue();
- this.policyInterpreter =
- FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter,
- getConf(), this.federationFacade, this.homeSubClusterId);
+ queue = this.amRegistrationResponse.getQueue();
+ this.policyInterpreter = FederationPolicyUtils.loadAMRMPolicy(queue,
this.policyInterpreter,
+ getConf(), this.federationFacade, this.homeSubClusterId);
}
} catch (IOException | YarnException e) {
throw new YarnRuntimeException(e);
}
+ }
+ private Map<String, Token<AMRMTokenIdentifier>> getSCAMRMTokenIdentifierMap(
+ Map<String, byte[]> recoveredDataMap) throws IOException {
+ Map<String, Token<AMRMTokenIdentifier>> uamMap = new HashMap<>();
+ ApplicationId applicationId = this.attemptId.getApplicationId();
+ if (this.registryClient != null) {
+ uamMap = this.registryClient.loadStateFromRegistry(applicationId);
+ LOG.info("Found {} existing UAMs for application {} in Yarn Registry.",
Review Comment:
YARN registry.
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java:
##########
@@ -1242,4 +1255,130 @@ public void testRemoveAppFromRegistryApplicationFailed()
return null;
});
}
+
+ public void testRecoverWithBadSubCluster(final RegistryOperations
registryObj) throws IOException, InterruptedException {
+ UserGroupInformation ugi =
+ interceptor.getUGIWithToken(interceptor.getAttemptId());
+
+ ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
Review Comment:
To have such a long function, does it make sense to define the function as a
method itself and call it instead a huge nested lambda?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]