Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4271#discussion_r125944216
  
    --- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
    @@ -520,72 +439,128 @@ protected YarnClusterClient deployInternal() throws 
Exception {
                        taskManagerMemoryMb =  yarnMinAllocationMB;
                }
     
    -           // Create application via yarnClient
    -           final YarnClientApplication yarnApplication = 
yarnClient.createApplication();
    -           GetNewApplicationResponse appResponse = 
yarnApplication.getNewApplicationResponse();
    -
    -           Resource maxRes = appResponse.getMaximumResourceCapability();
                final String note = "Please check the 
'yarn.scheduler.maximum-allocation-mb' and the 
'yarn.nodemanager.resource.memory-mb' configuration values\n";
    -           if (jobManagerMemoryMb > maxRes.getMemory()) {
    -                   failSessionDuringDeployment(yarnClient, 
yarnApplication);
    +           if (jobManagerMemoryMb > maximumResourceCapability.getMemory()) 
{
                        throw new YarnDeploymentException("The cluster does not 
have the requested resources for the JobManager available!\n"
    -                           + "Maximum Memory: " + maxRes.getMemory() + "MB 
Requested: " + jobManagerMemoryMb + "MB. " + note);
    +                           + "Maximum Memory: " + 
maximumResourceCapability.getMemory() + "MB Requested: " + jobManagerMemoryMb + 
"MB. " + note);
                }
     
    -           if (taskManagerMemoryMb > maxRes.getMemory()) {
    -                   failSessionDuringDeployment(yarnClient, 
yarnApplication);
    +           if (taskManagerMemoryMb > 
maximumResourceCapability.getMemory()) {
                        throw new YarnDeploymentException("The cluster does not 
have the requested resources for the TaskManagers available!\n"
    -                           + "Maximum Memory: " + maxRes.getMemory() + " 
Requested: " + taskManagerMemoryMb + "MB. " + note);
    +                           + "Maximum Memory: " + 
maximumResourceCapability.getMemory() + " Requested: " + taskManagerMemoryMb + 
"MB. " + note);
                }
     
                final String noteRsc = "\nThe Flink YARN client will try to 
allocate the YARN session, but maybe not all TaskManagers are " +
                        "connecting from the beginning because the resources 
are currently not available in the cluster. " +
                        "The allocation might take more time than usual because 
the Flink YARN client needs to wait until " +
                        "the resources become available.";
                int totalMemoryRequired = jobManagerMemoryMb + 
taskManagerMemoryMb * taskManagerCount;
    -           ClusterResourceDescription freeClusterMem;
    -           try {
    -                   freeClusterMem = 
getCurrentFreeClusterResources(yarnClient);
    -           } catch (YarnException | IOException e) {
    -                   failSessionDuringDeployment(yarnClient, 
yarnApplication);
    -                   throw new YarnDeploymentException("Could not retrieve 
information about free cluster resources.", e);
    -           }
     
    -           if (freeClusterMem.totalFreeMemory < totalMemoryRequired) {
    +
    +           if (freeClusterResources.totalFreeMemory < totalMemoryRequired) 
{
                        LOG.warn("This YARN session requires " + 
totalMemoryRequired + "MB of memory in the cluster. "
    -                           + "There are currently only " + 
freeClusterMem.totalFreeMemory + "MB available." + noteRsc);
    +                           + "There are currently only " + 
freeClusterResources.totalFreeMemory + "MB available." + noteRsc);
     
                }
    -           if (taskManagerMemoryMb > freeClusterMem.containerLimit) {
    +           if (taskManagerMemoryMb > freeClusterResources.containerLimit) {
                        LOG.warn("The requested amount of memory for the 
TaskManagers (" + taskManagerMemoryMb + "MB) is more than "
    -                           + "the largest possible YARN container: " + 
freeClusterMem.containerLimit + noteRsc);
    +                           + "the largest possible YARN container: " + 
freeClusterResources.containerLimit + noteRsc);
                }
    -           if (jobManagerMemoryMb > freeClusterMem.containerLimit) {
    +           if (jobManagerMemoryMb > freeClusterResources.containerLimit) {
                        LOG.warn("The requested amount of memory for the 
JobManager (" + jobManagerMemoryMb + "MB) is more than "
    -                           + "the largest possible YARN container: " + 
freeClusterMem.containerLimit + noteRsc);
    +                           + "the largest possible YARN container: " + 
freeClusterResources.containerLimit + noteRsc);
                }
     
                // ----------------- check if the requested containers fit into 
the cluster.
     
    -           int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree, 
freeClusterMem.nodeManagersFree.length);
    +           int[] nmFree = 
Arrays.copyOf(freeClusterResources.nodeManagersFree, 
freeClusterResources.nodeManagersFree.length);
                // first, allocate the jobManager somewhere.
                if (!allocateResource(nmFree, jobManagerMemoryMb)) {
                        LOG.warn("Unable to find a NodeManager that can fit the 
JobManager/Application master. " +
                                "The JobManager requires " + jobManagerMemoryMb 
+ "MB. NodeManagers available: " +
    -                           
Arrays.toString(freeClusterMem.nodeManagersFree) + noteRsc);
    +                           
Arrays.toString(freeClusterResources.nodeManagersFree) + noteRsc);
                }
                // allocate TaskManagers
                for (int i = 0; i < taskManagerCount; i++) {
                        if (!allocateResource(nmFree, taskManagerMemoryMb)) {
                                LOG.warn("There is not enough memory available 
in the YARN cluster. " +
                                        "The TaskManager(s) require " + 
taskManagerMemoryMb + "MB each. " +
    -                                   "NodeManagers available: " + 
Arrays.toString(freeClusterMem.nodeManagersFree) + "\n" +
    +                                   "NodeManagers available: " + 
Arrays.toString(freeClusterResources.nodeManagersFree) + "\n" +
                                        "After allocating the JobManager (" + 
jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + ") 
TaskManagers, " +
                                        "the following NodeManagers are 
available: " + Arrays.toString(nmFree)  + noteRsc);
                        }
                }
     
    -           ApplicationReport report = startAppMaster(null, yarnClient, 
yarnApplication);
    +           return new ClusterSpecification(
    +                   jobManagerMemoryMb,
    +                   taskManagerMemoryMb,
    +                   clusterSpecification.getNumberTaskManagers(),
    +                   clusterSpecification.getSlotsPerTaskManager());
    +
    +   }
    +
    +   protected void logClusterSpecification(ClusterSpecification 
clusterSpecification) {
    --- End diff --
    
    I would remove this method as it is only used once and isn't really more 
readable than the inline alternative.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to