This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 246a8c5dbfbf652f56bad312a6e90362c3ffe9d7
Author: Andrey Zagrebin <[email protected]>
AuthorDate: Mon Jun 22 19:09:00 2020 +0300

    [FLINK-18353] Make enabling of the JVM Direct Memory limit configurable for 
JM
    
    The JVM Direct Memory leak is unlikely in JM. Therefore, we could
    disable its limit by default. This way it could span into e.g. JVM
    Overhead w/o failure to improve the user experience as before FLIP-116.
    If user needs the limit, e.g. to investigate container OOMs, the limit can 
be enabled by
    setting the 'jobmanager.memory.enable-jvm-direct-memory-limit' option.
    
    This closes #12745.
---
 .../_includes/generated/common_memory_section.html |  8 ++++-
 .../generated/job_manager_configuration.html       |  8 ++++-
 docs/ops/memory/mem_migration.md                   | 13 +++++++
 docs/ops/memory/mem_setup.md                       | 15 ++++----
 docs/ops/memory/mem_setup_jobmanager.md            |  6 ++--
 docs/ops/memory/mem_trouble.md                     |  8 +++--
 .../flink/configuration/ConfigurationUtils.java    |  1 -
 .../flink/configuration/JobManagerOptions.java     | 27 +++++++++++++--
 .../decorators/JavaCmdJobManagerDecorator.java     |  6 ++--
 .../decorators/JavaCmdJobManagerDecoratorTest.java |  6 ++--
 .../runtime/jobmanager/JobManagerProcessUtils.java |  6 ++++
 .../flink/runtime/util/bash/BashJavaUtils.java     |  2 +-
 .../util/config/memory/ProcessMemoryUtils.java     | 20 ++++++++---
 .../config/memory/ProcessMemoryUtilsTestBase.java  | 40 +++++++++++++++-------
 .../apache/flink/yarn/YarnClusterDescriptor.java   |  3 +-
 .../flink/yarn/YarnClusterDescriptorTest.java      |  4 +--
 16 files changed, 129 insertions(+), 44 deletions(-)

diff --git a/docs/_includes/generated/common_memory_section.html 
b/docs/_includes/generated/common_memory_section.html
index ce0878c..3b1df1b 100644
--- a/docs/_includes/generated/common_memory_section.html
+++ b/docs/_includes/generated/common_memory_section.html
@@ -9,6 +9,12 @@
     </thead>
     <tbody>
         <tr>
+            <td><h5>jobmanager.memory.enable-jvm-direct-memory-limit</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to enable the JVM direct memory limit of the 
JobManager process (-XX:MaxDirectMemorySize). The limit will be set to the 
value of 'jobmanager.memory.off-heap.size' option. </td>
+        </tr>
+        <tr>
             <td><h5>jobmanager.memory.flink.size</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>MemorySize</td>
@@ -48,7 +54,7 @@
             <td><h5>jobmanager.memory.off-heap.size</h5></td>
             <td style="word-wrap: break-word;">128 mb</td>
             <td>MemorySize</td>
-            <td>Off-heap Memory size for JobManager. The JVM direct memory 
limit of the Job Manager process (-XX:MaxDirectMemorySize) will be set to this 
value. This option covers all off-heap memory usage including direct and native 
memory allocation.</td>
+            <td>Off-heap Memory size for JobManager. This option covers all 
off-heap memory usage including direct and native memory allocation. The JVM 
direct memory limit of the JobManager process (-XX:MaxDirectMemorySize) will be 
set to this value if the limit is enabled by 
'jobmanager.memory.enable-jvm-direct-memory-limit'. </td>
         </tr>
         <tr>
             <td><h5>jobmanager.memory.process.size</h5></td>
diff --git a/docs/_includes/generated/job_manager_configuration.html 
b/docs/_includes/generated/job_manager_configuration.html
index 87341aa..988bcc6 100644
--- a/docs/_includes/generated/job_manager_configuration.html
+++ b/docs/_includes/generated/job_manager_configuration.html
@@ -33,6 +33,12 @@
             <td>This option specifies how the job computation recovers from 
task failures. Accepted values are:<ul><li>'full': Restarts all tasks to 
recover the job.</li><li>'region': Restarts all tasks that could be affected by 
the task failure. More details can be found <a 
href="../dev/task_failure_recovery.html#restart-pipelined-region-failover-strategy">here</a>.</li></ul></td>
         </tr>
         <tr>
+            <td><h5>jobmanager.memory.enable-jvm-direct-memory-limit</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to enable the JVM direct memory limit of the 
JobManager process (-XX:MaxDirectMemorySize). The limit will be set to the 
value of 'jobmanager.memory.off-heap.size' option. </td>
+        </tr>
+        <tr>
             <td><h5>jobmanager.memory.flink.size</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>MemorySize</td>
@@ -72,7 +78,7 @@
             <td><h5>jobmanager.memory.off-heap.size</h5></td>
             <td style="word-wrap: break-word;">128 mb</td>
             <td>MemorySize</td>
-            <td>Off-heap Memory size for JobManager. The JVM direct memory 
limit of the Job Manager process (-XX:MaxDirectMemorySize) will be set to this 
value. This option covers all off-heap memory usage including direct and native 
memory allocation.</td>
+            <td>Off-heap Memory size for JobManager. This option covers all 
off-heap memory usage including direct and native memory allocation. The JVM 
direct memory limit of the JobManager process (-XX:MaxDirectMemorySize) will be 
set to this value if the limit is enabled by 
'jobmanager.memory.enable-jvm-direct-memory-limit'. </td>
         </tr>
         <tr>
             <td><h5>jobmanager.memory.process.size</h5></td>
diff --git a/docs/ops/memory/mem_migration.md b/docs/ops/memory/mem_migration.md
index 64427de..0282579 100644
--- a/docs/ops/memory/mem_migration.md
+++ b/docs/ops/memory/mem_migration.md
@@ -239,6 +239,19 @@ is also derived as the rest of what is left after 
subtracting all other componen
 control over the [JVM Heap]({% link ops/memory/mem_setup_jobmanager.md 
%}#configure-jvm-heap) by adjusting the
 [`jobmanager.memory.heap.size`](../config.html#jobmanager-memory-heap-size) 
option.
 
+## Flink JVM process memory limits
+
+Since *1.10* release, Flink sets the *JVM Metaspace* and *JVM Direct Memory* 
limits for the TaskManager process
+by adding the corresponding JVM arguments. Since *1.11* release, Flink also 
sets the *JVM Metaspace* limit for the JobManager process.
+You can enable the *JVM Direct Memory* limit for JobManager process if you set 
the
+[`jobmanager.memory.enable-jvm-direct-memory-limit`](../config.html#jobmanager-memory-enable-jvm-direct-memory-limit)
 option.
+See also [JVM parameters](mem_setup.html#jvm-parameters).
+
+Flink sets the mentioned JVM memory limits to simplify debugging of the 
corresponding memory leaks and avoid
+[the container out-of-memory 
errors](mem_trouble.html#container-memory-exceeded).
+See also the troubleshooting guide for details about the [JVM 
Metaspace](mem_trouble.html#outofmemoryerror-metaspace)
+and [JVM Direct 
Memory](mem_trouble.html#outofmemoryerror-direct-buffer-memory) 
*OutOfMemoryErrors*.
+
 ## Container Cut-Off Memory
 
 For containerized deployments, you could previously specify a cut-off memory. 
This memory could accommodate for unaccounted memory allocations.
diff --git a/docs/ops/memory/mem_setup.md b/docs/ops/memory/mem_setup.md
index cd4ee2d..b39cdab 100644
--- a/docs/ops/memory/mem_setup.md
+++ b/docs/ops/memory/mem_setup.md
@@ -94,13 +94,16 @@ Configuring other memory components also requires caution 
as it can produce furt
 Flink explicitly adds the following memory related JVM arguments while 
starting its processes, based on the configured
 or derived memory component sizes:
 
-| &nbsp;&nbsp;**JVM Arguments**&nbsp;&nbsp; | &nbsp;&nbsp;**Value for 
TaskManager**&nbsp;&nbsp; | &nbsp;&nbsp;**Value for JobManager**&nbsp;&nbsp; |
-| :---------------------------------------- | 
:------------------------------------------------- | 
:------------------------------------------------ |
-| *-Xmx* and *-Xms*                         | Framework + Task Heap Memory     
                  | JVM Heap Memory                                   |
-| *-XX:MaxDirectMemorySize*                 | Framework + Task Off-heap (*) + 
Network Memory     | Off-heap Memory (*)                               |
-| *-XX:MaxMetaspaceSize*                    | JVM Metaspace                    
                  | JVM Metaspace                                     |
+| &nbsp;&nbsp;**JVM Arguments**&nbsp;&nbsp;                                    
          | &nbsp;&nbsp;**Value for TaskManager**&nbsp;&nbsp;  | 
&nbsp;&nbsp;**Value for JobManager**&nbsp;&nbsp;  |
+| 
:-------------------------------------------------------------------------------------
 | :------------------------------------------------- | 
:------------------------------------------------ |
+| *-Xmx* and *-Xms*                                                            
          | Framework + Task Heap Memory                       | JVM Heap 
Memory                                   |
+| *-XX:MaxDirectMemorySize*<br/>(always added only for TaskManager, see note 
for JobManager) | Framework + Task Off-heap (\*) + Network Memory     | 
Off-heap Memory (\*),(\*\*)                          |
+| *-XX:MaxMetaspaceSize*                                                       
          | JVM Metaspace                                      | JVM Metaspace  
                                   |
 {:.table-bordered}
-(*) Notice, that the native non-direct usage of memory in user code can be 
also accounted for as a part of the off-heap memory.
+(\*) Notice, that the native non-direct usage of memory in user code can be 
also accounted for as a part of the off-heap memory.
+<br/>
+(\*\*) The *JVM Direct memory limit* is added for JobManager process only if 
the corresponding option
+[`jobmanager.memory.enable-jvm-direct-memory-limit`](../config.html#jobmanager-memory-enable-jvm-direct-memory-limit)
 is set.
 <br/><br/>
 
 Check also the detailed memory model for 
[TaskManager](mem_setup_tm.html#detailed-memory-model) and
diff --git a/docs/ops/memory/mem_setup_jobmanager.md 
b/docs/ops/memory/mem_setup_jobmanager.md
index c564da9..f0125d1 100644
--- a/docs/ops/memory/mem_setup_jobmanager.md
+++ b/docs/ops/memory/mem_setup_jobmanager.md
@@ -74,8 +74,10 @@ The Flink scripts and CLI set the *JVM Heap* size via the 
JVM parameters *-Xms*
 
 ### Configure Off-heap Memory
 
-The *Off-heap* memory component accounts for any type of *JVM direct memory* 
and *native memory* usage. Therefore, it
-is also set via the corresponding JVM argument: *-XX:MaxDirectMemorySize*, see 
also [JVM parameters](mem_setup.html#jvm-parameters).
+The *Off-heap* memory component accounts for any type of *JVM direct memory* 
and *native memory* usage. Therefore,
+you can also enable the *JVM Direct Memory* limit by setting the 
[`jobmanager.memory.enable-jvm-direct-memory-limit`](../config.html#jobmanager-memory-enable-jvm-direct-memory-limit)
 option.
+If this option is configured, Flink will set the limit to the *Off-heap* 
memory size via the corresponding JVM argument: *-XX:MaxDirectMemorySize*.
+See also [JVM parameters](mem_setup.html#jvm-parameters).
 
 The size of this component can be configured by 
[`jobmanager.memory.off-heap.size`](../config.html#jobmanager-memory-off-heap-size)
 option. This option can be tuned e.g. if the JobManager process throws 
‘OutOfMemoryError: Direct buffer memory’, see
diff --git a/docs/ops/memory/mem_trouble.md b/docs/ops/memory/mem_trouble.md
index 8cf940b..64c479b 100644
--- a/docs/ops/memory/mem_trouble.md
+++ b/docs/ops/memory/mem_trouble.md
@@ -37,7 +37,7 @@ greater than 1, etc.) or configuration conflicts. Check the 
documentation chapte
 The exception usually indicates that the *JVM Heap* is too small. You can try 
to increase the JVM Heap size
 by increasing [total memory](mem_setup.html#configure-total-memory). You can 
also directly increase
 [task heap memory](mem_setup_tm.html#task-operator-heap-memory) for 
TaskManagers or
-[JVM Heap memory]({% link ops/memory/mem_setup_jobmanager.md 
%}#configure-jvm-heap) for Masters.
+[JVM Heap memory]({% link ops/memory/mem_setup_jobmanager.md 
%}#configure-jvm-heap) for JobManagers.
 
 <span class="label label-info">Note</span> You can also increase the 
[framework heap memory](mem_setup_tm.html#framework-memory)
 for TaskManagers, but you should only change this option if you are sure the 
Flink framework itself needs more memory.
@@ -54,7 +54,7 @@ See also how to configure off-heap memory for 
[TaskManagers](mem_setup_tm.html#c
 
 The exception usually indicates that [JVM metaspace 
limit](mem_setup.html#jvm-parameters) is configured too small.
 You can try to increase the JVM metaspace option for 
[TaskManagers](../config.html#taskmanager-memory-jvm-metaspace-size)
-or [Masters](../config.html#jobmanager-memory-jvm-metaspace-size).
+or [JobManagers](../config.html#jobmanager-memory-jvm-metaspace-size).
 
 ## IOException: Insufficient number of network buffers
 
@@ -72,6 +72,10 @@ If a Flink container tries to allocate memory beyond its 
requested size (Yarn, M
 this usually indicates that Flink has not reserved enough native memory. You 
can observe this either by using an external
 monitoring system or from the error messages when a container gets killed by 
the deployment environment.
 
+If you encounter this problem in the *JobManager* process, you can also enable 
the *JVM Direct Memory* limit by setting the
+[`jobmanager.memory.enable-jvm-direct-memory-limit`](../config.html#jobmanager-memory-enable-jvm-direct-memory-limit)
 option
+to exclude possible *JVM Direct Memory* leak.
+
 If [RocksDBStateBackend](../state/state_backends.html#the-rocksdbstatebackend) 
is used, and the memory controlling is disabled,
 you can try to increase the TaskManager's [managed 
memory](mem_setup.html#managed-memory).
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
index 4fbc512..d4ff3b4 100755
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
@@ -194,7 +194,6 @@ public class ConfigurationUtils {
 
                checkArgument(configs.containsKey(xmx));
                checkArgument(configs.containsKey(xms));
-               checkArgument(configs.containsKey(maxDirect));
                checkArgument(configs.containsKey(maxMetadata));
 
                return configs;
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index dad1497..aa3dc9a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -169,9 +169,30 @@ public class JobManagerOptions {
                key("jobmanager.memory.off-heap.size")
                        .memoryType()
                        .defaultValue(MemorySize.ofMebiBytes(128))
-                       .withDescription("Off-heap Memory size for JobManager. 
The JVM direct memory limit of the Job Manager " +
-                               "process (-XX:MaxDirectMemorySize) will be set 
to this value. This option covers all off-heap memory " +
-                               "usage including direct and native memory 
allocation.");
+                       .withDescription(Description
+                               .builder()
+                               .text(
+                                       "Off-heap Memory size for JobManager. 
This option covers all off-heap memory usage including " +
+                                               "direct and native memory 
allocation. The JVM direct memory limit of the JobManager process " +
+                                               "(-XX:MaxDirectMemorySize) will 
be set to this value if the limit is enabled by " +
+                                               
"'jobmanager.memory.enable-jvm-direct-memory-limit'. ")
+                               .build());
+
+       /**
+        * Off-heap Memory size for the JobManager.
+        */
+       @Documentation.Section(Documentation.Sections.COMMON_MEMORY)
+       public static final ConfigOption<Boolean> 
JVM_DIRECT_MEMORY_LIMIT_ENABLED =
+               key("jobmanager.memory.enable-jvm-direct-memory-limit")
+                       .booleanType()
+                       .defaultValue(false)
+                       .withDescription(Description
+                               .builder()
+                               .text(
+                                       "Whether to enable the JVM direct 
memory limit of the JobManager process " +
+                                               "(-XX:MaxDirectMemorySize). The 
limit will be set to the value of '%s' option. ",
+                                       text(OFF_HEAP_MEMORY.key()))
+                               .build());
 
        /**
         * JVM Metaspace Size for the JobManager.
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecorator.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecorator.java
index 134bfee..8102c6b 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecorator.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecorator.java
@@ -25,8 +25,6 @@ import 
org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerPar
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
 import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
 import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
-import org.apache.flink.runtime.util.config.memory.ProcessMemorySpec;
-import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
 
 import io.fabric8.kubernetes.api.model.Container;
 import io.fabric8.kubernetes.api.model.ContainerBuilder;
@@ -84,13 +82,13 @@ public class JavaCmdJobManagerDecorator extends 
AbstractKubernetesStepDecorator
         */
        private static String getJobManagerStartCommand(
                        Configuration flinkConfig,
-                       ProcessMemorySpec jobManagerProcessSpec,
+                       JobManagerProcessSpec jobManagerProcessSpec,
                        String configDirectory,
                        String logDirectory,
                        boolean hasLogback,
                        boolean hasLog4j,
                        String mainClass) {
-               final String jvmMemOpts = 
ProcessMemoryUtils.generateJvmParametersStr(jobManagerProcessSpec);
+               final String jvmMemOpts = 
JobManagerProcessUtils.generateJvmParametersStr(jobManagerProcessSpec, 
flinkConfig);
                return KubernetesUtils.getCommonStartCommand(
                        flinkConfig,
                        KubernetesUtils.ClusterComponent.JOB_MANAGER,
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecoratorTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecoratorTest.java
index f236d47..32a12a1 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecoratorTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecoratorTest.java
@@ -26,7 +26,6 @@ import 
org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
 import org.apache.flink.kubernetes.kubeclient.FlinkPod;
 import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerTestBase;
 import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
-import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
 
 import io.fabric8.kubernetes.api.model.Container;
 import org.junit.Test;
@@ -64,8 +63,9 @@ public class JavaCmdJobManagerDecoratorTest extends 
KubernetesJobManagerTestBase
                                        FLINK_LOG_DIR_IN_POD, 
FLINK_LOG_DIR_IN_POD);
 
        // Memory variables
-       private static final String jmJvmMem = 
ProcessMemoryUtils.generateJvmParametersStr(
-               
JobManagerProcessUtils.createDefaultJobManagerProcessSpec(JOB_MANAGER_MEMORY));
+       private final String jmJvmMem = 
JobManagerProcessUtils.generateJvmParametersStr(
+               
JobManagerProcessUtils.createDefaultJobManagerProcessSpec(JOB_MANAGER_MEMORY),
+               flinkConfig);
 
        private JavaCmdJobManagerDecorator javaCmdJobManagerDecorator;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtils.java
index d34b3bc..d5b1f8c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtils.java
@@ -93,4 +93,10 @@ public class JobManagerProcessUtils {
                configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(totalProcessMemoryMb));
                return processSpecFromConfig(configuration);
        }
+
+       public static String generateJvmParametersStr(JobManagerProcessSpec 
processSpec, Configuration configuration) {
+               return ProcessMemoryUtils.generateJvmParametersStr(
+                       processSpec,
+                       
configuration.getBoolean(JobManagerOptions.JVM_DIRECT_MEMORY_LIMIT_ENABLED));
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java
index 9d53db0..079dde2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java
@@ -105,7 +105,7 @@ public class BashJavaUtils {
 
                logMasterConfiguration(jobManagerProcessSpec);
 
-               return 
Collections.singletonList(ProcessMemoryUtils.generateJvmParametersStr(jobManagerProcessSpec));
+               return 
Collections.singletonList(JobManagerProcessUtils.generateJvmParametersStr(jobManagerProcessSpec,
 configuration));
        }
 
        private static void logMasterConfiguration(JobManagerProcessSpec spec) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java
index b154156..479d484 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java
@@ -290,9 +290,21 @@ public class ProcessMemoryUtils<FM extends FlinkMemory> {
        }
 
        public static String generateJvmParametersStr(ProcessMemorySpec 
processSpec) {
-               return "-Xmx" + processSpec.getJvmHeapMemorySize().getBytes()
-                       + " -Xms" + 
processSpec.getJvmHeapMemorySize().getBytes()
-                       + " -XX:MaxDirectMemorySize=" + 
processSpec.getJvmDirectMemorySize().getBytes()
-                       + " -XX:MaxMetaspaceSize=" + 
processSpec.getJvmMetaspaceSize().getBytes();
+               return generateJvmParametersStr(processSpec, true);
+       }
+
+       public static String generateJvmParametersStr(ProcessMemorySpec 
processSpec, boolean enableDirectMemoryLimit) {
+               final StringBuilder jvmArgStr = new StringBuilder();
+
+               
jvmArgStr.append("-Xmx").append(processSpec.getJvmHeapMemorySize().getBytes());
+               jvmArgStr.append(" 
-Xms").append(processSpec.getJvmHeapMemorySize().getBytes());
+
+               if (enableDirectMemoryLimit) {
+                       jvmArgStr.append(" 
-XX:MaxDirectMemorySize=").append(processSpec.getJvmDirectMemorySize().getBytes());
+               }
+
+               jvmArgStr.append(" 
-XX:MaxMetaspaceSize=").append(processSpec.getJvmMetaspaceSize().getBytes());
+
+               return jvmArgStr.toString();
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtilsTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtilsTestBase.java
index 301e2b6..b787df2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtilsTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtilsTestBase.java
@@ -76,20 +76,28 @@ public abstract class ProcessMemoryUtilsTestBase<T extends 
ProcessMemorySpec> ex
 
        @Test
        public void testGenerateJvmParameters() {
-               MemorySize heap = MemorySize.ofMebiBytes(1);
-               MemorySize directMemory = MemorySize.ofMebiBytes(2);
-               MemorySize metaspace = MemorySize.ofMebiBytes(3);
-               String jvmParamsStr = 
ProcessMemoryUtils.generateJvmParametersStr(new JvmArgTestingProcessMemorySpec(
-                       heap,
-                       directMemory,
-                       metaspace
-               ));
+               ProcessMemorySpec spec = 
JvmArgTestingProcessMemorySpec.generate();
+               String jvmParamsStr = 
ProcessMemoryUtils.generateJvmParametersStr(spec, true);
                Map<String, String> configs = 
ConfigurationUtils.parseJvmArgString(jvmParamsStr);
 
-               assertThat(MemorySize.parse(configs.get("-Xmx")), is(heap));
-               assertThat(MemorySize.parse(configs.get("-Xms")), is(heap));
-               
assertThat(MemorySize.parse(configs.get("-XX:MaxDirectMemorySize=")), 
is(directMemory));
-               
assertThat(MemorySize.parse(configs.get("-XX:MaxMetaspaceSize=")), 
is(metaspace));
+               assertThat(configs.size(), is(4));
+               assertThat(MemorySize.parse(configs.get("-Xmx")), 
is(spec.getJvmHeapMemorySize()));
+               assertThat(MemorySize.parse(configs.get("-Xms")), 
is(spec.getJvmHeapMemorySize()));
+               
assertThat(MemorySize.parse(configs.get("-XX:MaxMetaspaceSize=")), 
is(spec.getJvmMetaspaceSize()));
+               
assertThat(MemorySize.parse(configs.get("-XX:MaxDirectMemorySize=")), 
is(spec.getJvmDirectMemorySize()));
+       }
+
+       @Test
+       public void testGenerateJvmParametersWithoutDirectMemoryLimit() {
+               ProcessMemorySpec spec = 
JvmArgTestingProcessMemorySpec.generate();
+               String jvmParamsStr = 
ProcessMemoryUtils.generateJvmParametersStr(spec, false);
+               Map<String, String> configs = 
ConfigurationUtils.parseJvmArgString(jvmParamsStr);
+
+               assertThat(configs.size(), is(3));
+               assertThat(MemorySize.parse(configs.get("-Xmx")), 
is(spec.getJvmHeapMemorySize()));
+               assertThat(MemorySize.parse(configs.get("-Xms")), 
is(spec.getJvmHeapMemorySize()));
+               
assertThat(MemorySize.parse(configs.get("-XX:MaxMetaspaceSize=")), 
is(spec.getJvmMetaspaceSize()));
+               assertThat(configs.containsKey("-XX:MaxDirectMemorySize="), 
is(false));
        }
 
        @Test
@@ -380,5 +388,13 @@ public abstract class ProcessMemoryUtilsTestBase<T extends 
ProcessMemorySpec> ex
                public MemorySize getTotalProcessMemorySize() {
                        throw new UnsupportedOperationException();
                }
+
+               public static JvmArgTestingProcessMemorySpec generate() {
+                       return new JvmArgTestingProcessMemorySpec(
+                               MemorySize.ofMebiBytes(1),
+                               MemorySize.ofMebiBytes(2),
+                               MemorySize.ofMebiBytes(3)
+                       );
+               }
        }
 }
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 52ac1b1..c98139c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -49,7 +49,6 @@ import 
org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
 import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
 import org.apache.flink.runtime.util.HadoopUtils;
-import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ShutdownHookUtil;
@@ -1446,7 +1445,7 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
                final  Map<String, String> startCommandValues = new HashMap<>();
                startCommandValues.put("java", "$JAVA_HOME/bin/java");
 
-               String jvmHeapMem = 
ProcessMemoryUtils.generateJvmParametersStr(processSpec);
+               String jvmHeapMem = 
JobManagerProcessUtils.generateJvmParametersStr(processSpec, 
flinkConfiguration);
                startCommandValues.put("jvmmem", jvmHeapMem);
 
                startCommandValues.put("jvmopts", javaOpts);
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index 9f845ee..83784fb 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -29,7 +29,7 @@ import 
org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
-import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
+import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
@@ -166,7 +166,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 
                final JobManagerProcessSpec jobManagerProcessSpec = 
createDefaultJobManagerProcessSpec(1024);
                final String java = "$JAVA_HOME/bin/java";
-               final String jvmmem = 
ProcessMemoryUtils.generateJvmParametersStr(jobManagerProcessSpec);
+               final String jvmmem = 
JobManagerProcessUtils.generateJvmParametersStr(jobManagerProcessSpec, cfg);
                final String jvmOpts = "-Djvm"; // if set
                final String jmJvmOpts = "-DjmJvm"; // if set
                final String krb5 = "-Djava.security.krb5.conf=krb5.conf";

Reply via email to