This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch flip116
in repository https://gitbox.apache.org/repos/asf/flink.git

commit aa9e04545789a0bff76d7832113b9420203aeb2e
Author: Andrey Zagrebin <[email protected]>
AuthorDate: Wed Apr 8 18:31:12 2020 +0300

    [FLINK-16745][k8s] Start Kubernetes JM with FLIP-116 JVM memory args
---
 .../test-scripts/test_kubernetes_session.sh        |  2 +-
 .../decorators/JavaCmdJobManagerDecorator.java     | 12 +++++------
 .../parameters/KubernetesJobManagerParameters.java |  5 +++++
 .../kubernetes/cli/KubernetesSessionCliTest.java   | 23 +++++++++++-----------
 .../kubeclient/KubernetesJobManagerTestBase.java   |  6 +++++-
 .../decorators/JavaCmdJobManagerDecoratorTest.java |  4 ++--
 .../KubernetesJobManagerParametersTest.java        |  8 +++++---
 7 files changed, 36 insertions(+), 24 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/test_kubernetes_session.sh 
b/flink-end-to-end-tests/test-scripts/test_kubernetes_session.sh
index f459a4f..e28f024 100755
--- a/flink-end-to-end-tests/test-scripts/test_kubernetes_session.sh
+++ b/flink-end-to-end-tests/test-scripts/test_kubernetes_session.sh
@@ -64,7 +64,7 @@ mkdir -p "$(dirname $LOCAL_OUTPUT_PATH)"
 # Set the memory and cpu smaller than default, so that the jobmanager and 
taskmanager pods could be allocated in minikube.
 OUTPUT=`"$FLINK_DIR"/bin/kubernetes-session.sh 
-Dkubernetes.cluster-id=${CLUSTER_ID} \
     -Dkubernetes.container.image=${FLINK_IMAGE_NAME} \
-    -Djobmanager.heap.size=512m \
+    -Djobmanager.memory.process.size=768m \
     -Dcontainerized.heap-cutoff-min=100 \
     -Dkubernetes.jobmanager.cpu=0.5 \
     -Dkubernetes.taskmanager.cpu=0.5 \
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecorator.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecorator.java
index 11900f4..39eb0b9 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecorator.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecorator.java
@@ -22,7 +22,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.kubeclient.FlinkPod;
 import 
org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
-import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.util.config.memory.ProcessMemorySpec;
+import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
 
 import io.fabric8.kubernetes.api.model.Container;
 import io.fabric8.kubernetes.api.model.ContainerBuilder;
@@ -46,7 +47,7 @@ public class JavaCmdJobManagerDecorator extends 
AbstractKubernetesStepDecorator
        public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
                final String startCommand = getJobManagerStartCommand(
                        kubernetesJobManagerParameters.getFlinkConfiguration(),
-                       kubernetesJobManagerParameters.getJobManagerMemoryMB(),
+                       
kubernetesJobManagerParameters.getJobManagerProcessSpec(),
                        kubernetesJobManagerParameters.getFlinkConfDirInPod(),
                        kubernetesJobManagerParameters.getFlinkLogDirInPod(),
                        kubernetesJobManagerParameters.hasLogback(),
@@ -67,7 +68,7 @@ public class JavaCmdJobManagerDecorator extends 
AbstractKubernetesStepDecorator
         * Generates the shell command to start a jobmanager for kubernetes.
         *
         * @param flinkConfig The Flink configuration.
-        * @param jobManagerMemoryMb JobManager heap size.
+        * @param jobManagerProcessSpec JobManager process memory spec.
         * @param configDirectory The configuration directory for the 
flink-conf.yaml
         * @param logDirectory The log directory.
         * @param hasLogback Uses logback?
@@ -77,14 +78,13 @@ public class JavaCmdJobManagerDecorator extends 
AbstractKubernetesStepDecorator
         */
        private static String getJobManagerStartCommand(
                        Configuration flinkConfig,
-                       int jobManagerMemoryMb,
+                       ProcessMemorySpec jobManagerProcessSpec,
                        String configDirectory,
                        String logDirectory,
                        boolean hasLogback,
                        boolean hasLog4j,
                        String mainClass) {
-               final int heapSize = 
BootstrapTools.calculateHeapSize(jobManagerMemoryMb, flinkConfig);
-               final String jvmMemOpts = String.format("-Xms%sm -Xmx%sm", 
heapSize, heapSize);
+               final String jvmMemOpts = 
ProcessMemoryUtils.generateJvmParametersStr(jobManagerProcessSpec);
                return KubernetesUtils.getCommonStartCommand(
                        flinkConfig,
                        KubernetesUtils.ClusterComponent.JOB_MANAGER,
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java
index 6540cdb..d4e81d1 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -89,6 +90,10 @@ public class KubernetesJobManagerParameters extends 
AbstractKubernetesParameters
                return JOB_MANAGER_MAIN_CONTAINER_NAME;
        }
 
+       public JobManagerProcessSpec getJobManagerProcessSpec() {
+               return clusterSpecification.getMasterProcessSpec();
+       }
+
        public int getJobManagerMemoryMB() {
                return clusterSpecification.getMasterMemoryMB();
        }
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java
index a7bf478..df7fe6a 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java
@@ -84,7 +84,7 @@ public class KubernetesSessionCliTest {
                                "-e", KubernetesSessionClusterExecutor.NAME,
                                "-D" + TaskManagerOptions.NUM_TASK_SLOTS.key() 
+ "=3"};
 
-               final KubernetesSessionCli cli = 
createFlinkKubernetesCustomCliWithTmTotalMemory(1234);
+               final KubernetesSessionCli cli = 
createFlinkKubernetesCustomCliWithJmAndTmTotalMemory(1234);
 
                final Configuration executorConfig = 
cli.getEffectiveConfiguration(params);
                final ClusterClientFactory<String> clientFactory = 
getClusterClientFactory(executorConfig);
@@ -96,7 +96,7 @@ public class KubernetesSessionCliTest {
 
        @Test
        public void testResumeFromKubernetesID() throws Exception {
-               final KubernetesSessionCli cli = 
createFlinkKubernetesCustomCliWithTmTotalMemory(1024);
+               final KubernetesSessionCli cli = 
createFlinkKubernetesCustomCliWithJmAndTmTotalMemory(1024);
 
                final String clusterId = "my-test-CLUSTER_ID";
                final String[] args = new String[] {
@@ -120,13 +120,13 @@ public class KubernetesSessionCliTest {
                final int taskManagerMemory = 7331;
                final int slotsPerTaskManager = 30;
 
-               configuration.set(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, 
MemorySize.ofMebiBytes(jobManagerMemory));
+               configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(jobManagerMemory));
                configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(taskManagerMemory));
                configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
slotsPerTaskManager);
 
                final String[] args = {
                                "-e", KubernetesSessionClusterExecutor.NAME,
-                               "-D" + 
JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key() + "=" + jobManagerMemory + "m",
+                               "-D" + 
JobManagerOptions.TOTAL_PROCESS_MEMORY.key() + "=" + jobManagerMemory + "m",
                                "-D" + 
TaskManagerOptions.TOTAL_PROCESS_MEMORY.key() + "=" + taskManagerMemory + "m",
                                "-D" + TaskManagerOptions.NUM_TASK_SLOTS.key() 
+ "=" + slotsPerTaskManager
                };
@@ -150,7 +150,7 @@ public class KubernetesSessionCliTest {
        public void testConfigurationClusterSpecification() throws Exception {
                final Configuration configuration = new Configuration();
                final int jobManagerMemory = 1337;
-               configuration.set(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, 
MemorySize.ofMebiBytes(jobManagerMemory));
+               configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(jobManagerMemory));
                final int taskManagerMemory = 7331;
                configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(taskManagerMemory));
                final int slotsPerTaskManager = 42;
@@ -175,11 +175,11 @@ public class KubernetesSessionCliTest {
        public void testHeapMemoryPropertyWithUnitMB() throws Exception {
                final String[] args = new String[] {
                                "-e", KubernetesSessionClusterExecutor.NAME,
-                               "-D" + 
JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key() + "=1024m",
+                               "-D" + 
JobManagerOptions.TOTAL_PROCESS_MEMORY.key() + "=1024m",
                                "-D" + 
TaskManagerOptions.TOTAL_PROCESS_MEMORY.key() + "=2048m"
                };
 
-               final KubernetesSessionCli cli = 
createFlinkKubernetesCustomCliWithTmTotalMemory(1024);
+               final KubernetesSessionCli cli = 
createFlinkKubernetesCustomCliWithJmAndTmTotalMemory(1024);
 
                final Configuration executorConfig = 
cli.getEffectiveConfiguration(args);
                final ClusterClientFactory<String> clientFactory = 
getClusterClientFactory(executorConfig);
@@ -196,11 +196,11 @@ public class KubernetesSessionCliTest {
        public void testHeapMemoryPropertyWithArbitraryUnit() throws Exception {
                final String[] args = new String[] {
                                "-e", KubernetesSessionClusterExecutor.NAME,
-                               "-D" + 
JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key() + "=1g",
+                               "-D" + 
JobManagerOptions.TOTAL_PROCESS_MEMORY.key() + "=1g",
                                "-D" + 
TaskManagerOptions.TOTAL_PROCESS_MEMORY.key() + "=3g"
                };
 
-               final KubernetesSessionCli cli = 
createFlinkKubernetesCustomCliWithTmTotalMemory(1024);
+               final KubernetesSessionCli cli = 
createFlinkKubernetesCustomCliWithJmAndTmTotalMemory(1024);
 
                final Configuration executorConfig = 
cli.getEffectiveConfiguration(args);
                final ClusterClientFactory<String> clientFactory = 
getClusterClientFactory(executorConfig);
@@ -239,7 +239,7 @@ public class KubernetesSessionCliTest {
                                "-e", KubernetesSessionClusterExecutor.NAME
                };
 
-               final KubernetesSessionCli cli = 
createFlinkKubernetesCustomCliWithTmTotalMemory(1024);
+               final KubernetesSessionCli cli = 
createFlinkKubernetesCustomCliWithJmAndTmTotalMemory(1024);
 
                final Configuration executorConfig = 
cli.getEffectiveConfiguration(args);
                final ClusterClientFactory<String> clientFactory = 
getClusterClientFactory(executorConfig);
@@ -254,8 +254,9 @@ public class KubernetesSessionCliTest {
                return 
clusterClientServiceLoader.getClusterClientFactory(executorConfig);
        }
 
-       private KubernetesSessionCli 
createFlinkKubernetesCustomCliWithTmTotalMemory(int totalMemory) {
+       private KubernetesSessionCli 
createFlinkKubernetesCustomCliWithJmAndTmTotalMemory(int totalMemory) {
                Configuration configuration = new Configuration();
+               configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(totalMemory));
                configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(totalMemory));
                return new KubernetesSessionCli(configuration);
        }
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesJobManagerTestBase.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesJobManagerTestBase.java
index c063a6b..3c03f7b 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesJobManagerTestBase.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesJobManagerTestBase.java
@@ -26,12 +26,15 @@ import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.kubernetes.KubernetesTestBase;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import 
org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
 
 import org.junit.Before;
 
 import java.util.HashMap;
 import java.util.Map;
 
+import static 
org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.createDefaultJobManagerProcessSpec;
+
 /**
  * Base test class for the JobManager side.
  */
@@ -39,6 +42,7 @@ public class KubernetesJobManagerTestBase extends 
KubernetesTestBase {
 
        protected static final double JOB_MANAGER_CPU = 2.0;
        protected static final int JOB_MANAGER_MEMORY = 768;
+       protected static final JobManagerProcessSpec JOB_MANAGER_PROCESS_SPEC = 
createDefaultJobManagerProcessSpec(JOB_MANAGER_MEMORY);
 
        protected static final int REST_PORT = 9081;
        protected static final int RPC_PORT = 7123;
@@ -83,7 +87,7 @@ public class KubernetesJobManagerTestBase extends 
KubernetesTestBase {
                
this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_NODE_SELECTOR, 
nodeSelector);
 
                final ClusterSpecification clusterSpecification = new 
ClusterSpecification.ClusterSpecificationBuilder()
-                       .setMasterMemoryMB(JOB_MANAGER_MEMORY)
+                       .setMasterProcessSpec(JOB_MANAGER_PROCESS_SPEC)
                        .setTaskManagerMemoryMB(1024)
                        .setSlotsPerTaskManager(3)
                        .createClusterSpecification();
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecoratorTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecoratorTest.java
index 36ccd1e..fbf0295 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecoratorTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecoratorTest.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal
 import 
org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint;
 import org.apache.flink.kubernetes.kubeclient.FlinkPod;
 import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerTestBase;
+import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
 
 import io.fabric8.kubernetes.api.model.Container;
 import org.junit.Before;
@@ -63,8 +64,7 @@ public class JavaCmdJobManagerDecoratorTest extends 
KubernetesJobManagerTestBase
                                        FLINK_LOG_DIR_IN_POD, 
FLINK_LOG_DIR_IN_POD);
 
        // Memory variables
-       private static final String jmJvmMem = String.format("-Xms%dm -Xmx%dm",
-                       JOB_MANAGER_MEMORY - 600, JOB_MANAGER_MEMORY - 600);
+       private static final String jmJvmMem = 
ProcessMemoryUtils.generateJvmParametersStr(JOB_MANAGER_PROCESS_SPEC);
 
        private JavaCmdJobManagerDecorator javaCmdJobManagerDecorator;
 
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java
index 00f3bae..91f874b 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java
@@ -27,6 +27,8 @@ import org.apache.flink.kubernetes.KubernetesTestBase;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal;
 import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
+import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import org.junit.Test;
@@ -47,11 +49,11 @@ import static org.junit.Assert.fail;
  */
 public class KubernetesJobManagerParametersTest extends KubernetesTestBase {
 
-       private static final int JOB_MANAGER_MEMORY = 768;
+       private static final JobManagerProcessSpec JOB_MANAGER_PROCESS_SPEC = 
JobManagerProcessUtils.createDefaultJobManagerProcessSpec(768);
        private static final double JOB_MANAGER_CPU = 2.0;
 
        private final ClusterSpecification clusterSpecification = new 
ClusterSpecification.ClusterSpecificationBuilder()
-               .setMasterMemoryMB(JOB_MANAGER_MEMORY)
+               .setMasterProcessSpec(JOB_MANAGER_PROCESS_SPEC)
                .setTaskManagerMemoryMB(1024)
                .setSlotsPerTaskManager(1)
                .createClusterSpecification();
@@ -93,7 +95,7 @@ public class KubernetesJobManagerParametersTest extends 
KubernetesTestBase {
 
        @Test
        public void testGetJobManagerMemoryMB() {
-               assertEquals(JOB_MANAGER_MEMORY, 
kubernetesJobManagerParameters.getJobManagerMemoryMB());
+               
assertThat(kubernetesJobManagerParameters.getJobManagerProcessSpec(), 
is(JOB_MANAGER_PROCESS_SPEC));
        }
 
        @Test

Reply via email to