[ 
https://issues.apache.org/jira/browse/FLINK-7113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16076814#comment-16076814
 ] 

ASF GitHub Bot commented on FLINK-7113:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4271#discussion_r125944668
  
    --- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
    @@ -520,72  439,128 @@ protected YarnClusterClient deployInternal() throws 
Exception {
                        taskManagerMemoryMb =  yarnMinAllocationMB;
                }
     
    -           // Create application via yarnClient
    -           final YarnClientApplication yarnApplication = 
yarnClient.createApplication();
    -           GetNewApplicationResponse appResponse = 
yarnApplication.getNewApplicationResponse();
    -
    -           Resource maxRes = appResponse.getMaximumResourceCapability();
                final String note = "Please check the 
'yarn.scheduler.maximum-allocation-mb' and the 
'yarn.nodemanager.resource.memory-mb' configuration values\n";
    -           if (jobManagerMemoryMb > maxRes.getMemory()) {
    -                   failSessionDuringDeployment(yarnClient, 
yarnApplication);
                if (jobManagerMemoryMb > maximumResourceCapability.getMemory()) 
{
                        throw new YarnDeploymentException("The cluster does not 
have the requested resources for the JobManager available!\n"
    -                             "Maximum Memory: "   maxRes.getMemory()   "MB 
Requested: "   jobManagerMemoryMb   "MB. "   note);
                                  "Maximum Memory: "   
maximumResourceCapability.getMemory()   "MB Requested: "   jobManagerMemoryMb   
"MB. "   note);
                }
     
    -           if (taskManagerMemoryMb > maxRes.getMemory()) {
    -                   failSessionDuringDeployment(yarnClient, 
yarnApplication);
                if (taskManagerMemoryMb > 
maximumResourceCapability.getMemory()) {
                        throw new YarnDeploymentException("The cluster does not 
have the requested resources for the TaskManagers available!\n"
    -                             "Maximum Memory: "   maxRes.getMemory()   " 
Requested: "   taskManagerMemoryMb   "MB. "   note);
                                  "Maximum Memory: "   
maximumResourceCapability.getMemory()   " Requested: "   taskManagerMemoryMb   
"MB. "   note);
                }
     
                final String noteRsc = "\nThe Flink YARN client will try to 
allocate the YARN session, but maybe not all TaskManagers are "  
                        "connecting from the beginning because the resources 
are currently not available in the cluster. "  
                        "The allocation might take more time than usual because 
the Flink YARN client needs to wait until "  
                        "the resources become available.";
                int totalMemoryRequired = jobManagerMemoryMb   
taskManagerMemoryMb * taskManagerCount;
    -           ClusterResourceDescription freeClusterMem;
    -           try {
    -                   freeClusterMem = 
getCurrentFreeClusterResources(yarnClient);
    -           } catch (YarnException | IOException e) {
    -                   failSessionDuringDeployment(yarnClient, 
yarnApplication);
    -                   throw new YarnDeploymentException("Could not retrieve 
information about free cluster resources.", e);
    -           }
     
    -           if (freeClusterMem.totalFreeMemory < totalMemoryRequired) {
     
                if (freeClusterResources.totalFreeMemory < totalMemoryRequired) 
{
                        LOG.warn("This YARN session requires "   
totalMemoryRequired   "MB of memory in the cluster. "
    -                             "There are currently only "   
freeClusterMem.totalFreeMemory   "MB available."   noteRsc);
                                  "There are currently only "   
freeClusterResources.totalFreeMemory   "MB available."   noteRsc);
     
                }
    -           if (taskManagerMemoryMb > freeClusterMem.containerLimit) {
                if (taskManagerMemoryMb > freeClusterResources.containerLimit) {
                        LOG.warn("The requested amount of memory for the 
TaskManagers ("   taskManagerMemoryMb   "MB) is more than "
    -                             "the largest possible YARN container: "   
freeClusterMem.containerLimit   noteRsc);
                                  "the largest possible YARN container: "   
freeClusterResources.containerLimit   noteRsc);
                }
    -           if (jobManagerMemoryMb > freeClusterMem.containerLimit) {
                if (jobManagerMemoryMb > freeClusterResources.containerLimit) {
                        LOG.warn("The requested amount of memory for the 
JobManager ("   jobManagerMemoryMb   "MB) is more than "
    -                             "the largest possible YARN container: "   
freeClusterMem.containerLimit   noteRsc);
                                  "the largest possible YARN container: "   
freeClusterResources.containerLimit   noteRsc);
                }
     
                // ----------------- check if the requested containers fit into 
the cluster.
     
    -           int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree, 
freeClusterMem.nodeManagersFree.length);
                int[] nmFree = 
Arrays.copyOf(freeClusterResources.nodeManagersFree, 
freeClusterResources.nodeManagersFree.length);
                // first, allocate the jobManager somewhere.
                if (!allocateResource(nmFree, jobManagerMemoryMb)) {
                        LOG.warn("Unable to find a NodeManager that can fit the 
JobManager/Application master. "  
                                "The JobManager requires "   jobManagerMemoryMb 
  "MB. NodeManagers available: "  
    -                           
Arrays.toString(freeClusterMem.nodeManagersFree)   noteRsc);
                                
Arrays.toString(freeClusterResources.nodeManagersFree)   noteRsc);
                }
                // allocate TaskManagers
                for (int i = 0; i < taskManagerCount; i  ) {
                        if (!allocateResource(nmFree, taskManagerMemoryMb)) {
                                LOG.warn("There is not enough memory available 
in the YARN cluster. "  
                                        "The TaskManager(s) require "   
taskManagerMemoryMb   "MB each. "  
    -                                   "NodeManagers available: "   
Arrays.toString(freeClusterMem.nodeManagersFree)   "\n"  
                                        "NodeManagers available: "   
Arrays.toString(freeClusterResources.nodeManagersFree)   "\n"  
                                        "After allocating the JobManager ("   
jobManagerMemoryMb   "MB) and ("   i   "/"   taskManagerCount   ") 
TaskManagers, "  
                                        "the following NodeManagers are 
available: "   Arrays.toString(nmFree)    noteRsc);
                        }
                }
     
    -           ApplicationReport report = startAppMaster(null, yarnClient, 
yarnApplication);
                return new ClusterSpecification(
                        jobManagerMemoryMb,
                        taskManagerMemoryMb,
                        clusterSpecification.getNumberTaskManagers(),
                        clusterSpecification.getSlotsPerTaskManager());
     
        }
     
        protected void logClusterSpecification(ClusterSpecification 
clusterSpecification) {
                LOG.info("Cluster specification: {}", clusterSpecification);
        }
     
        /**
         * This method will block until the ApplicationMaster/JobManager have 
been
         * deployed on YARN.
         */
        protected YarnClusterClient deployInternal(ClusterSpecification 
clusterSpecification) throws Exception {
     
                isReadyForDeployment(clusterSpecification);
     
                final YarnClient yarnClient = getYarnClient();
     
                // ------------------ Check if the specified queue exists 
--------------------
     
                checkYarnQueues(yarnClient);
     
                // ------------------ Add dynamic properties to local 
flinkConfiguraton ------
                Map<String, String> dynProperties = 
getDynamicProperties(dynamicPropertiesEncoded);
                for (Map.Entry<String, String> dynProperty : 
dynProperties.entrySet()) {
                        flinkConfiguration.setString(dynProperty.getKey(), 
dynProperty.getValue());
                }
     
    --- End diff --
    
    semove second empty line


> Make ClusterDescriptor independent of Flink cluster size
> --------------------------------------------------------
>
>                 Key: FLINK-7113
>                 URL: https://issues.apache.org/jira/browse/FLINK-7113
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Cluster Management
>    Affects Versions: 1.4.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>
> The {{ClusterDescriptor}} needs to know the size of the Flink cluster it is 
> supposed to deploy. As a consequence we have the 
> {{AbstractYarnClusterDescriptor}} which is configured with this information 
> via setters. I think it would be better to give the cluster size to the 
> {{ClusterDescriptor}} via the {{deploySession(ClusterSpecification)}} call. 
> That way we better decouple the deployment from the cluster configuration.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to