Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4271#discussion_r125944216
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
---
@@ -520,72 +439,128 @@ protected YarnClusterClient deployInternal() throws
Exception {
taskManagerMemoryMb = yarnMinAllocationMB;
}
- // Create application via yarnClient
- final YarnClientApplication yarnApplication =
yarnClient.createApplication();
- GetNewApplicationResponse appResponse =
yarnApplication.getNewApplicationResponse();
-
- Resource maxRes = appResponse.getMaximumResourceCapability();
final String note = "Please check the
'yarn.scheduler.maximum-allocation-mb' and the
'yarn.nodemanager.resource.memory-mb' configuration values\n";
- if (jobManagerMemoryMb > maxRes.getMemory()) {
- failSessionDuringDeployment(yarnClient,
yarnApplication);
+ if (jobManagerMemoryMb > maximumResourceCapability.getMemory())
{
throw new YarnDeploymentException("The cluster does not
have the requested resources for the JobManager available!\n"
- + "Maximum Memory: " + maxRes.getMemory() + "MB
Requested: " + jobManagerMemoryMb + "MB. " + note);
+ + "Maximum Memory: " +
maximumResourceCapability.getMemory() + "MB Requested: " + jobManagerMemoryMb +
"MB. " + note);
}
- if (taskManagerMemoryMb > maxRes.getMemory()) {
- failSessionDuringDeployment(yarnClient,
yarnApplication);
+ if (taskManagerMemoryMb >
maximumResourceCapability.getMemory()) {
throw new YarnDeploymentException("The cluster does not
have the requested resources for the TaskManagers available!\n"
- + "Maximum Memory: " + maxRes.getMemory() + "
Requested: " + taskManagerMemoryMb + "MB. " + note);
+ + "Maximum Memory: " +
maximumResourceCapability.getMemory() + " Requested: " + taskManagerMemoryMb +
"MB. " + note);
}
final String noteRsc = "\nThe Flink YARN client will try to
allocate the YARN session, but maybe not all TaskManagers are " +
"connecting from the beginning because the resources
are currently not available in the cluster. " +
"The allocation might take more time than usual because
the Flink YARN client needs to wait until " +
"the resources become available.";
int totalMemoryRequired = jobManagerMemoryMb +
taskManagerMemoryMb * taskManagerCount;
- ClusterResourceDescription freeClusterMem;
- try {
- freeClusterMem =
getCurrentFreeClusterResources(yarnClient);
- } catch (YarnException | IOException e) {
- failSessionDuringDeployment(yarnClient,
yarnApplication);
- throw new YarnDeploymentException("Could not retrieve
information about free cluster resources.", e);
- }
- if (freeClusterMem.totalFreeMemory < totalMemoryRequired) {
+
+ if (freeClusterResources.totalFreeMemory < totalMemoryRequired)
{
LOG.warn("This YARN session requires " +
totalMemoryRequired + "MB of memory in the cluster. "
- + "There are currently only " +
freeClusterMem.totalFreeMemory + "MB available." + noteRsc);
+ + "There are currently only " +
freeClusterResources.totalFreeMemory + "MB available." + noteRsc);
}
- if (taskManagerMemoryMb > freeClusterMem.containerLimit) {
+ if (taskManagerMemoryMb > freeClusterResources.containerLimit) {
LOG.warn("The requested amount of memory for the
TaskManagers (" + taskManagerMemoryMb + "MB) is more than "
- + "the largest possible YARN container: " +
freeClusterMem.containerLimit + noteRsc);
+ + "the largest possible YARN container: " +
freeClusterResources.containerLimit + noteRsc);
}
- if (jobManagerMemoryMb > freeClusterMem.containerLimit) {
+ if (jobManagerMemoryMb > freeClusterResources.containerLimit) {
LOG.warn("The requested amount of memory for the
JobManager (" + jobManagerMemoryMb + "MB) is more than "
- + "the largest possible YARN container: " +
freeClusterMem.containerLimit + noteRsc);
+ + "the largest possible YARN container: " +
freeClusterResources.containerLimit + noteRsc);
}
// ----------------- check if the requested containers fit into
the cluster.
- int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree,
freeClusterMem.nodeManagersFree.length);
+ int[] nmFree =
Arrays.copyOf(freeClusterResources.nodeManagersFree,
freeClusterResources.nodeManagersFree.length);
// first, allocate the jobManager somewhere.
if (!allocateResource(nmFree, jobManagerMemoryMb)) {
LOG.warn("Unable to find a NodeManager that can fit the
JobManager/Application master. " +
"The JobManager requires " + jobManagerMemoryMb
+ "MB. NodeManagers available: " +
-
Arrays.toString(freeClusterMem.nodeManagersFree) + noteRsc);
+
Arrays.toString(freeClusterResources.nodeManagersFree) + noteRsc);
}
// allocate TaskManagers
for (int i = 0; i < taskManagerCount; i++) {
if (!allocateResource(nmFree, taskManagerMemoryMb)) {
LOG.warn("There is not enough memory available
in the YARN cluster. " +
"The TaskManager(s) require " +
taskManagerMemoryMb + "MB each. " +
- "NodeManagers available: " +
Arrays.toString(freeClusterMem.nodeManagersFree) + "\n" +
+ "NodeManagers available: " +
Arrays.toString(freeClusterResources.nodeManagersFree) + "\n" +
"After allocating the JobManager (" +
jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + ")
TaskManagers, " +
"the following NodeManagers are
available: " + Arrays.toString(nmFree) + noteRsc);
}
}
- ApplicationReport report = startAppMaster(null, yarnClient,
yarnApplication);
+ return new ClusterSpecification(
+ jobManagerMemoryMb,
+ taskManagerMemoryMb,
+ clusterSpecification.getNumberTaskManagers(),
+ clusterSpecification.getSlotsPerTaskManager());
+
+ }
+
+ protected void logClusterSpecification(ClusterSpecification
clusterSpecification) {
--- End diff --
I would remove this method as it is only used once and isn't really more
readable than the inline alternative.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---