[
https://issues.apache.org/jira/browse/FLINK-7113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16076814#comment-16076814
]
ASF GitHub Bot commented on FLINK-7113:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4271#discussion_r125944668
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
---
@@ -520,72 439,128 @@ protected YarnClusterClient deployInternal() throws
Exception {
taskManagerMemoryMb = yarnMinAllocationMB;
}
- // Create application via yarnClient
- final YarnClientApplication yarnApplication =
yarnClient.createApplication();
- GetNewApplicationResponse appResponse =
yarnApplication.getNewApplicationResponse();
-
- Resource maxRes = appResponse.getMaximumResourceCapability();
final String note = "Please check the
'yarn.scheduler.maximum-allocation-mb' and the
'yarn.nodemanager.resource.memory-mb' configuration values\n";
- if (jobManagerMemoryMb > maxRes.getMemory()) {
- failSessionDuringDeployment(yarnClient,
yarnApplication);
if (jobManagerMemoryMb > maximumResourceCapability.getMemory())
{
throw new YarnDeploymentException("The cluster does not
have the requested resources for the JobManager available!\n"
- "Maximum Memory: " maxRes.getMemory() "MB
Requested: " jobManagerMemoryMb "MB. " note);
"Maximum Memory: "
maximumResourceCapability.getMemory() "MB Requested: " jobManagerMemoryMb
"MB. " note);
}
- if (taskManagerMemoryMb > maxRes.getMemory()) {
- failSessionDuringDeployment(yarnClient,
yarnApplication);
if (taskManagerMemoryMb >
maximumResourceCapability.getMemory()) {
throw new YarnDeploymentException("The cluster does not
have the requested resources for the TaskManagers available!\n"
- "Maximum Memory: " maxRes.getMemory() "
Requested: " taskManagerMemoryMb "MB. " note);
"Maximum Memory: "
maximumResourceCapability.getMemory() " Requested: " taskManagerMemoryMb
"MB. " note);
}
final String noteRsc = "\nThe Flink YARN client will try to
allocate the YARN session, but maybe not all TaskManagers are "
"connecting from the beginning because the resources
are currently not available in the cluster. "
"The allocation might take more time than usual because
the Flink YARN client needs to wait until "
"the resources become available.";
int totalMemoryRequired = jobManagerMemoryMb
taskManagerMemoryMb * taskManagerCount;
- ClusterResourceDescription freeClusterMem;
- try {
- freeClusterMem =
getCurrentFreeClusterResources(yarnClient);
- } catch (YarnException | IOException e) {
- failSessionDuringDeployment(yarnClient,
yarnApplication);
- throw new YarnDeploymentException("Could not retrieve
information about free cluster resources.", e);
- }
- if (freeClusterMem.totalFreeMemory < totalMemoryRequired) {
if (freeClusterResources.totalFreeMemory < totalMemoryRequired)
{
LOG.warn("This YARN session requires "
totalMemoryRequired "MB of memory in the cluster. "
- "There are currently only "
freeClusterMem.totalFreeMemory "MB available." noteRsc);
"There are currently only "
freeClusterResources.totalFreeMemory "MB available." noteRsc);
}
- if (taskManagerMemoryMb > freeClusterMem.containerLimit) {
if (taskManagerMemoryMb > freeClusterResources.containerLimit) {
LOG.warn("The requested amount of memory for the
TaskManagers (" taskManagerMemoryMb "MB) is more than "
- "the largest possible YARN container: "
freeClusterMem.containerLimit noteRsc);
"the largest possible YARN container: "
freeClusterResources.containerLimit noteRsc);
}
- if (jobManagerMemoryMb > freeClusterMem.containerLimit) {
if (jobManagerMemoryMb > freeClusterResources.containerLimit) {
LOG.warn("The requested amount of memory for the
JobManager (" jobManagerMemoryMb "MB) is more than "
- "the largest possible YARN container: "
freeClusterMem.containerLimit noteRsc);
"the largest possible YARN container: "
freeClusterResources.containerLimit noteRsc);
}
// ----------------- check if the requested containers fit into
the cluster.
- int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree,
freeClusterMem.nodeManagersFree.length);
int[] nmFree =
Arrays.copyOf(freeClusterResources.nodeManagersFree,
freeClusterResources.nodeManagersFree.length);
// first, allocate the jobManager somewhere.
if (!allocateResource(nmFree, jobManagerMemoryMb)) {
LOG.warn("Unable to find a NodeManager that can fit the
JobManager/Application master. "
"The JobManager requires " jobManagerMemoryMb
"MB. NodeManagers available: "
-
Arrays.toString(freeClusterMem.nodeManagersFree) noteRsc);
Arrays.toString(freeClusterResources.nodeManagersFree) noteRsc);
}
// allocate TaskManagers
for (int i = 0; i < taskManagerCount; i ) {
if (!allocateResource(nmFree, taskManagerMemoryMb)) {
LOG.warn("There is not enough memory available
in the YARN cluster. "
"The TaskManager(s) require "
taskManagerMemoryMb "MB each. "
- "NodeManagers available: "
Arrays.toString(freeClusterMem.nodeManagersFree) "\n"
"NodeManagers available: "
Arrays.toString(freeClusterResources.nodeManagersFree) "\n"
"After allocating the JobManager ("
jobManagerMemoryMb "MB) and (" i "/" taskManagerCount ")
TaskManagers, "
"the following NodeManagers are
available: " Arrays.toString(nmFree) noteRsc);
}
}
- ApplicationReport report = startAppMaster(null, yarnClient,
yarnApplication);
return new ClusterSpecification(
jobManagerMemoryMb,
taskManagerMemoryMb,
clusterSpecification.getNumberTaskManagers(),
clusterSpecification.getSlotsPerTaskManager());
}
protected void logClusterSpecification(ClusterSpecification
clusterSpecification) {
LOG.info("Cluster specification: {}", clusterSpecification);
}
/**
* This method will block until the ApplicationMaster/JobManager have
been
* deployed on YARN.
*/
protected YarnClusterClient deployInternal(ClusterSpecification
clusterSpecification) throws Exception {
isReadyForDeployment(clusterSpecification);
final YarnClient yarnClient = getYarnClient();
// ------------------ Check if the specified queue exists
--------------------
checkYarnQueues(yarnClient);
// ------------------ Add dynamic properties to local
flinkConfiguraton ------
Map<String, String> dynProperties =
getDynamicProperties(dynamicPropertiesEncoded);
for (Map.Entry<String, String> dynProperty :
dynProperties.entrySet()) {
flinkConfiguration.setString(dynProperty.getKey(),
dynProperty.getValue());
}
--- End diff --
semove second empty line
> Make ClusterDescriptor independent of Flink cluster size
> --------------------------------------------------------
>
> Key: FLINK-7113
> URL: https://issues.apache.org/jira/browse/FLINK-7113
> Project: Flink
> Issue Type: Sub-task
> Components: Cluster Management
> Affects Versions: 1.4.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
>
> The {{ClusterDescriptor}} needs to know the size of the Flink cluster it is
> supposed to deploy. As a consequence we have the
> {{AbstractYarnClusterDescriptor}} which is configured with this information
> via setters. I think it would be better to give the cluster size to the
> {{ClusterDescriptor}} via the {{deploySession(ClusterSpecification)}} call.
> That way we better decouple the deployment from the cluster configuration.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)