[ 
https://issues.apache.org/jira/browse/FLINK-14560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

fa zheng updated FLINK-14560:
-----------------------------
    Description: 
If you accidentally  set taskmanager.memory.size: 0 in flink-conf.yaml, flink 
should take a fixed ratio with respect to the size of the task manager JVM. The 
relateted codes are in TaskManagerServicesConfiguration.fromConfiguration
{code:java}
//代码占位符
// extract memory settings
long configuredMemory;
String managedMemorySizeDefaultVal = 
TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
if 
(!configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal))
 {
   try {
      configuredMemory = 
MemorySize.parse(configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE),
 MEGA_BYTES).getMebiBytes();
   } catch (IllegalArgumentException e) {
      throw new IllegalConfigurationException(
         "Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
   }
} else {
   configuredMemory = Long.valueOf(managedMemorySizeDefaultVal);
}{code}
However, in ActiveResourceManagerFactory.java, flink will translate the value 
to byte.
{code:java}
//代码占位符
public static Configuration 
createActiveResourceManagerConfiguration(Configuration originalConfiguration) {
   final int taskManagerMemoryMB = 
ConfigurationUtils.getTaskManagerHeapMemory(originalConfiguration).getMebiBytes();
   final long cutoffMB = 
ContaineredTaskManagerParameters.calculateCutoffMB(originalConfiguration, 
taskManagerMemoryMB);
   final long processMemoryBytes = (taskManagerMemoryMB - cutoffMB) << 20; // 
megabytes to bytes
   final long managedMemoryBytes = 
TaskManagerServices.getManagedMemoryFromProcessMemory(originalConfiguration, 
processMemoryBytes);

   final Configuration resourceManagerConfig = new 
Configuration(originalConfiguration);
   resourceManagerConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
managedMemoryBytes + "b");

   return resourceManagerConfig;
}
{code}
 

As a result, 0 will translate to 0 b and is different from default value.  0 b 
will cause a error in following check code
{code:java}
//代码占位符
checkConfigParameter(
   
configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue())
 ||
      configuredMemory > 0, configuredMemory,
   TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
   "MemoryManager needs at least one MB of memory. " +
      "If you leave this config parameter empty, the system automatically " +
      "pick a fraction of the available memory.");
{code}
 

 

  was:
If you accidentally  set taskmanager.memory.size: 0 in flink-conf.yaml, flink 
should take a fixed ratio with respect to the size of the task manager JVM. The 
relateted codes are in TaskManagerServicesConfiguration.fromConfiguration
{code:java}
//代码占位符
// extract memory settings
long configuredMemory;
String managedMemorySizeDefaultVal = 
TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
if 
(!configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal))
 {
   try {
      configuredMemory = 
MemorySize.parse(configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE),
 MEGA_BYTES).getMebiBytes();
   } catch (IllegalArgumentException e) {
      throw new IllegalConfigurationException(
         "Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
   }
} else {
   configuredMemory = Long.valueOf(managedMemorySizeDefaultVal);
}{code}
However, in FlinkYarnSessionCli.java, flink will translate the value to byte.
{code:java}
//代码占位符
// JobManager Memory
final int jobManagerMemoryMB = 
ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes();

// Task Managers memory
final int taskManagerMemoryMB = 
ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes();
{code}
 

As a result, 0 will translate to 0 b and is different from default value.  0 b 
will cause a error in following check code
{code:java}
//代码占位符
checkConfigParameter(
   
configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue())
 ||
      configuredMemory > 0, configuredMemory,
   TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
   "MemoryManager needs at least one MB of memory. " +
      "If you leave this config parameter empty, the system automatically " +
      "pick a fraction of the available memory.");
{code}
 

 


> The value of taskmanager.memory.size in flink-conf.yaml is set to zero will 
> cause taskmanager not to work 
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-14560
>                 URL: https://issues.apache.org/jira/browse/FLINK-14560
>             Project: Flink
>          Issue Type: Bug
>          Components: Deployment / YARN
>    Affects Versions: 1.9.0, 1.9.1
>            Reporter: fa zheng
>            Assignee: fa zheng
>            Priority: Minor
>             Fix For: 1.10.0, 1.9.2
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> If you accidentally  set taskmanager.memory.size: 0 in flink-conf.yaml, flink 
> should take a fixed ratio with respect to the size of the task manager JVM. 
> The relateted codes are in TaskManagerServicesConfiguration.fromConfiguration
> {code:java}
> //代码占位符
> // extract memory settings
> long configuredMemory;
> String managedMemorySizeDefaultVal = 
> TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
> if 
> (!configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal))
>  {
>    try {
>       configuredMemory = 
> MemorySize.parse(configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE),
>  MEGA_BYTES).getMebiBytes();
>    } catch (IllegalArgumentException e) {
>       throw new IllegalConfigurationException(
>          "Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
>    }
> } else {
>    configuredMemory = Long.valueOf(managedMemorySizeDefaultVal);
> }{code}
> However, in ActiveResourceManagerFactory.java, flink will translate the value 
> to byte.
> {code:java}
> //代码占位符
> public static Configuration 
> createActiveResourceManagerConfiguration(Configuration originalConfiguration) 
> {
>    final int taskManagerMemoryMB = 
> ConfigurationUtils.getTaskManagerHeapMemory(originalConfiguration).getMebiBytes();
>    final long cutoffMB = 
> ContaineredTaskManagerParameters.calculateCutoffMB(originalConfiguration, 
> taskManagerMemoryMB);
>    final long processMemoryBytes = (taskManagerMemoryMB - cutoffMB) << 20; // 
> megabytes to bytes
>    final long managedMemoryBytes = 
> TaskManagerServices.getManagedMemoryFromProcessMemory(originalConfiguration, 
> processMemoryBytes);
>    final Configuration resourceManagerConfig = new 
> Configuration(originalConfiguration);
>    resourceManagerConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
> managedMemoryBytes + "b");
>    return resourceManagerConfig;
> }
> {code}
>  
> As a result, 0 will translate to 0 b and is different from default value.  0 
> b will cause a error in following check code
> {code:java}
> //代码占位符
> checkConfigParameter(
>    
> configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue())
>  ||
>       configuredMemory > 0, configuredMemory,
>    TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
>    "MemoryManager needs at least one MB of memory. " +
>       "If you leave this config parameter empty, the system automatically " +
>       "pick a fraction of the available memory.");
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to