xintongsong commented on a change in pull request #15936:
URL: https://github.com/apache/flink/pull/15936#discussion_r642887451
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorResourceUtils.java
##########
@@ -154,6 +154,23 @@ public static TaskExecutorResourceSpec
resourceSpecFromConfigForLocalExecution(
return resourceSpecFromConfig(adjustForLocalExecution(config));
}
+ public static long calculateTotalFlinkMemoryFromFactors(Configuration
config) {
+ return config.get(TaskManagerOptions.TASK_HEAP_MEMORY)
+ .add(config.get(TaskManagerOptions.TASK_OFF_HEAP_MEMORY))
+ .add(config.get(TaskManagerOptions.NETWORK_MEMORY_MAX))
+ .add(config.get(TaskManagerOptions.MANAGED_MEMORY_SIZE))
+ .add(config.get(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY))
+ .add(config.get(TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY))
+ .getBytes();
+ }
+
+ public static long calculateTotalProcessMemoryFromFactors(Configuration
config) {
+ return calculateTotalFlinkMemoryFromFactors(config)
+ + config.get(TaskManagerOptions.JVM_METASPACE)
+ .add(config.get(TaskManagerOptions.JVM_OVERHEAD_MAX))
+ .getBytes();
+ }
Review comment:
Better to add some assertions.
- memory sizes explicitly configured
- `max == min` for network & jvm-overhead
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorResourceUtils.java
##########
@@ -154,6 +154,23 @@ public static TaskExecutorResourceSpec
resourceSpecFromConfigForLocalExecution(
return resourceSpecFromConfig(adjustForLocalExecution(config));
}
+ public static long calculateTotalFlinkMemoryFromFactors(Configuration
config) {
Review comment:
```suggestion
public static long calculateTotalFlinkMemoryFromComponents(Configuration
config) {
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorResourceUtils.java
##########
@@ -154,6 +154,23 @@ public static TaskExecutorResourceSpec
resourceSpecFromConfigForLocalExecution(
return resourceSpecFromConfig(adjustForLocalExecution(config));
}
+ public static long calculateTotalFlinkMemoryFromFactors(Configuration
config) {
+ return config.get(TaskManagerOptions.TASK_HEAP_MEMORY)
+ .add(config.get(TaskManagerOptions.TASK_OFF_HEAP_MEMORY))
+ .add(config.get(TaskManagerOptions.NETWORK_MEMORY_MAX))
+ .add(config.get(TaskManagerOptions.MANAGED_MEMORY_SIZE))
+ .add(config.get(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY))
+ .add(config.get(TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY))
+ .getBytes();
+ }
+
+ public static long calculateTotalProcessMemoryFromFactors(Configuration
config) {
Review comment:
```suggestion
public static long
calculateTotalProcessMemoryFromComponents(Configuration config) {
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
##########
@@ -214,6 +214,8 @@ public void setup() throws IOException {
new BlobCacheService(new Configuration(), new VoidBlobStore(),
null);
configuration = new Configuration();
+ configuration.set(TaskManagerOptions.TASK_HEAP_MEMORY,
MemorySize.ofMebiBytes(10));
+ configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE,
MemorySize.ofMebiBytes(10));
Review comment:
I think the assumption is that memory sizes for all components should be
explicitly configured before starting the TM process. Having only these two
options set and the test doesn't fail is an indicator that we do not check the
assumption strictly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]