Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/5448#discussion_r191198882
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
---
@@ -647,7 +649,16 @@ public static long calculateHeapSizeMB(long
totalJavaMemorySizeMB, Configuration
final long heapSizeMB;
if (useOffHeap) {
- long offHeapSize =
config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
+ String managedMemorySizeDefaultVal =
TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
+ long offHeapSize = 0;
+ try {
+ offHeapSize =
Long.valueOf(managedMemorySizeDefaultVal);
+ if
(!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal))
{
+ offHeapSize =
MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getMebiBytes();
+ }
+ } catch (IllegalArgumentException e) {
+
--- End diff --
I found the flow with default value somewhat counterintuitive. How about we
structure this code like this:
long offHeapSize;
String managedMemorySizeDefaultVal =
TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
if
(!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal))
{
try {
offHeapSize =
MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE))
.getMebiBytes();
} catch (IllegalArgumentException e) {
throw new IllegalConfigurationException(
"Could not read " +
TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
}
} else {
offHeapSize = Long.valueOf(managedMemorySizeDefaultVal);
}
---