Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/6297#discussion_r202338296
--- Diff:
flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
---
@@ -29,6 +29,46 @@
private static final String[] EMPTY = new String[0];
+ /**
+ * Get job manager's heap memory.
+ *
+ * This method will check the new key {@link
JobManagerOptions#JOB_MANAGER_HEAP_MEMORY} and
+ * the old key {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY_MB} for
backwards compatibility.
+ *
+ * @param configuration the configuration object
+ * @return the memory size of job manager's heap memory.
+ */
+ public static MemorySize getJobManagerHeapMemory(Configuration
configuration) {
+ if
(configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key())) {
+ return
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY));
+ } else if
(configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB.key()))
{
+ return
MemorySize.parse(configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB)
+ "m");
+ } else {
+ throw new RuntimeException("Can not find config key : "
+ JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key()
--- End diff --
Use `FlinkRuntimeException`.
---