tillrohrmann commented on a change in pull request #10717:
[FLINK-15369][runtime] MiniCluster use fixed network / managed memory sizes by
default
URL: https://github.com/apache/flink/pull/10717#discussion_r362803458
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
##########
@@ -668,24 +679,23 @@ MemorySize getTotalJvmMetaspaceAndOverheadSize() {
}
}
- public static Configuration adjustConfigurationForLocalExecution(final
Configuration configuration, int numberOfTaskExecutors) {
-
- final long jvmFreeHeapMemBytesPerTm =
EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() /
numberOfTaskExecutors;
- final MemorySize totalHeapMemory = new
MemorySize(jvmFreeHeapMemBytesPerTm);
-
- final MemorySize frameworkOffHeap =
getFrameworkOffHeapMemorySize(configuration);
- final MemorySize taskOffHeap =
getTaskOffHeapMemorySize(configuration);
- final MemorySize totalFlinkMemoryExceptShuffleAndManaged =
totalHeapMemory.add(frameworkOffHeap).add(taskOffHeap);
-
- final double shuffleFraction =
getShuffleMemoryRangeFraction(configuration).fraction;
- final double managedFraction =
getManagedMemoryRangeFraction(configuration).fraction;
+ public static Configuration adjustConfigurationForLocalExecution(final
Configuration configuration) {
+ final Configuration modifiedConfig = new
Configuration(configuration);
- final MemorySize estimatedTotalFlinkMemory =
totalFlinkMemoryExceptShuffleAndManaged
- .multiply(1 / (1 - shuffleFraction - managedFraction));
+ if (!isTaskExecutorResourceExplicitlyConfigured(configuration))
{
+ // This does not affect the JVM heap size for local
execution,
+ // we simply set it to pass the sanity checks in memory
calculations
+ modifiedConfig.set(TaskManagerOptions.TASK_HEAP_MEMORY,
MemorySize.parse("100m"));
+ }
- final Configuration modifiedConfig = new
Configuration(configuration);
- modifiedConfig.set(TaskManagerOptions.TOTAL_FLINK_MEMORY,
estimatedTotalFlinkMemory);
+ if (!isShuffleMemoryExplicitlyConfigured(configuration)) {
+
modifiedConfig.set(TaskManagerOptions.SHUFFLE_MEMORY_MIN,
MemorySize.parse("64m"));
+
modifiedConfig.set(TaskManagerOptions.SHUFFLE_MEMORY_MAX,
MemorySize.parse("64m"));
+ }
+ if (!isManagedMemorySizeExplicitlyConfigured(configuration)) {
+
modifiedConfig.set(TaskManagerOptions.MANAGED_MEMORY_SIZE,
MemorySize.parse("16m"));
+ }
Review comment:
Which other class will use this method in the foreseeable future? Once it
becomes necessary to use this method at different places I'm ok with making it
public. However, at the moment, this should only be used by the `MiniCluster`
and, hence, let's move it there. Moreover, this would further increase code
cohesion.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services