[
https://issues.apache.org/jira/browse/FLINK-18350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139079#comment-17139079
]
Xintong Song commented on FLINK-18350:
--------------------------------------
Hi [~stevenz3wu],
The TM memory configurations are required on the JM/RM side for the
containerized deployments (K8s/Yarn/Mesos), because RM needs to decide what
resource of the containers it should request from the underlying external
system.
For the standalone deployment, Flink does not require TM memory configurations
on the JM/RM side. TMs will decide their own resources from the local
configurations.
In your case, it really depends on whether your TMs are started by your custom
RM or started by other systems/scripts like Flink's standalone deployment. For
the former, it's necessary for the RM to have enough configurations to decide
what resources the TMs should be started with. For the latter, you can take a
look at {{ArbitraryWorkerResourceSpecFactory}}, which is used by
{{StandaloneResourceManagerFactory}} and does not require TM memory
configurations.
> [1.11.0] jobmanager requires taskmanager.memory.process.size config
> -------------------------------------------------------------------
>
> Key: FLINK-18350
> URL: https://issues.apache.org/jira/browse/FLINK-18350
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Configuration
> Affects Versions: 1.11.0
> Reporter: Steven Zhen Wu
> Priority: Critical
> Fix For: 1.11.0
>
>
>
> Saw this failure in jobmanager startup. I know the exception said that
> taskmanager.memory.process.size is misconfigured, which is a bug in our end.
> The bug wasn't discovered because taskmanager.memory.process.size was not
> required by jobmanager before 1.11.
> But I am wondering why is this required by jobmanager for session cluster
> mode. When taskmanager registering with jobmanager, it reports the resources
> (like CPU, memory etc.). BTW, we set it properly at taskmanager side in
> `flink-conf.yaml`.
> {code:java}
> 2020-06-17 18:06:25,079 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint [main] - Could
> not start cluster entrypoint TitusSessionClusterEntrypoint.
> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
> initialize the cluster entrypoint TitusSessionClusterEntrypoint.
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:516)
> at
> com.netflix.spaas.runtime.TitusSessionClusterEntrypoint.main(TitusSessionClusterEntrypoint.java:103)
> Caused by: org.apache.flink.util.FlinkException: Could not create the
> DispatcherResourceManagerComponent.
> at
> org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:255)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:216)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
> ... 2 more
> Caused by: org.apache.flink.configuration.IllegalConfigurationException:
> Cannot read memory size from config option 'taskmanager.memory.process.size'.
> at
> org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils.getMemorySizeFromConfig(ProcessMemoryUtils.java:234)
> at
> org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils.deriveProcessSpecWithTotalProcessMemory(ProcessMemoryUtils.java:100)
> at
> org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils.memoryProcessSpecFromConfig(ProcessMemoryUtils.java:79)
> at
> org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils.processSpecFromConfig(TaskExecutorProcessUtils.java:109)
> at
> org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpecBuilder.build(TaskExecutorProcessSpecBuilder.java:58)
> at
> org.apache.flink.runtime.resourcemanager.WorkerResourceSpecFactory.workerResourceSpecFromConfigAndCpu(WorkerResourceSpecFactory.java:37)
> at
> com.netflix.spaas.runtime.resourcemanager.TitusWorkerResourceSpecFactory.createDefaultWorkerResourceSpec(TitusWorkerResourceSpecFactory.java:17)
> at
> org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration.fromConfiguration(ResourceManagerRuntimeServicesConfiguration.java:67)
> at
> com.netflix.spaas.runtime.resourcemanager.TitusResourceManagerFactory.createResourceManager(TitusResourceManagerFactory.java:53)
> at
> org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:167)
> ... 9 more
> Caused by: java.lang.IllegalArgumentException: Could not parse value '7500}'
> for key 'taskmanager.memory.process.size'.
> at
> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:753)
> at
> org.apache.flink.configuration.Configuration.get(Configuration.java:738)
> at
> org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils.getMemorySizeFromConfig(ProcessMemoryUtils.java:232)
> ... 18 more
> Caused by: java.lang.IllegalArgumentException: Memory size unit '}' does not
> match any of the recognized units: (b | bytes) / (k | kb | kibibytes) / (m |
> mb | mebibytes) / (g | gb | gibibytes) / (t | tb | tebibytes)
> at
> org.apache.flink.configuration.MemorySize.parseUnit(MemorySize.java:331)
> at
> org.apache.flink.configuration.MemorySize.parseBytes(MemorySize.java:306)
> at org.apache.flink.configuration.MemorySize.parse(MemorySize.java:247)
> at
> org.apache.flink.configuration.Configuration.convertToMemorySize(Configuration.java:951)
> at
> org.apache.flink.configuration.Configuration.convertValue(Configuration.java:885)
> at
> org.apache.flink.configuration.Configuration.lambda$getOptional$2(Configuration.java:750)
> at java.util.Optional.map(Optional.java:215)
> at
> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:750)
> ... 20 more
> {code}
> We extend from WorkerResourceSpecFactory similar to
> KubernetesWorkerResourceSpecFactory.
> {code:java}
> public class TitusWorkerResourceSpecFactory extends WorkerResourceSpecFactory
> {
> public static final TitusWorkerResourceSpecFactory INSTANCE =
> new TitusWorkerResourceSpecFactory();
> @Override
> public WorkerResourceSpec createDefaultWorkerResourceSpec(Configuration
> configuration) {
> return workerResourceSpecFromConfigAndCpu(configuration,
> getDefaultCpus(configuration));
> }
> @VisibleForTesting
> static CPUResource getDefaultCpus(Configuration configuration) {
> double fallback = Double.valueOf(System.getenv("TITUS_NUM_CPU"));
> return TaskExecutorProcessUtils.getCpuCoresWithFallback(configuration,
> fallback);
> }
> }
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)