This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch flip116 in repository https://gitbox.apache.org/repos/asf/flink.git
commit aa9e04545789a0bff76d7832113b9420203aeb2e Author: Andrey Zagrebin <[email protected]> AuthorDate: Wed Apr 8 18:31:12 2020 +0300 [FLINK-16745][k8s] Start Kubernetes JM with FLIP-116 JVM memory args --- .../test-scripts/test_kubernetes_session.sh | 2 +- .../decorators/JavaCmdJobManagerDecorator.java | 12 +++++------ .../parameters/KubernetesJobManagerParameters.java | 5 +++++ .../kubernetes/cli/KubernetesSessionCliTest.java | 23 +++++++++++----------- .../kubeclient/KubernetesJobManagerTestBase.java | 6 +++++- .../decorators/JavaCmdJobManagerDecoratorTest.java | 4 ++-- .../KubernetesJobManagerParametersTest.java | 8 +++++--- 7 files changed, 36 insertions(+), 24 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/test_kubernetes_session.sh b/flink-end-to-end-tests/test-scripts/test_kubernetes_session.sh index f459a4f..e28f024 100755 --- a/flink-end-to-end-tests/test-scripts/test_kubernetes_session.sh +++ b/flink-end-to-end-tests/test-scripts/test_kubernetes_session.sh @@ -64,7 +64,7 @@ mkdir -p "$(dirname $LOCAL_OUTPUT_PATH)" # Set the memory and cpu smaller than default, so that the jobmanager and taskmanager pods could be allocated in minikube. OUTPUT=`"$FLINK_DIR"/bin/kubernetes-session.sh -Dkubernetes.cluster-id=${CLUSTER_ID} \ -Dkubernetes.container.image=${FLINK_IMAGE_NAME} \ - -Djobmanager.heap.size=512m \ + -Djobmanager.memory.process.size=768m \ -Dcontainerized.heap-cutoff-min=100 \ -Dkubernetes.jobmanager.cpu=0.5 \ -Dkubernetes.taskmanager.cpu=0.5 \ diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecorator.java index 11900f4..39eb0b9 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecorator.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecorator.java @@ -22,7 +22,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.kubeclient.FlinkPod; import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters; import org.apache.flink.kubernetes.utils.KubernetesUtils; -import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.util.config.memory.ProcessMemorySpec; +import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils; import io.fabric8.kubernetes.api.model.Container; import io.fabric8.kubernetes.api.model.ContainerBuilder; @@ -46,7 +47,7 @@ public class JavaCmdJobManagerDecorator extends AbstractKubernetesStepDecorator public FlinkPod decorateFlinkPod(FlinkPod flinkPod) { final String startCommand = getJobManagerStartCommand( kubernetesJobManagerParameters.getFlinkConfiguration(), - kubernetesJobManagerParameters.getJobManagerMemoryMB(), + kubernetesJobManagerParameters.getJobManagerProcessSpec(), kubernetesJobManagerParameters.getFlinkConfDirInPod(), kubernetesJobManagerParameters.getFlinkLogDirInPod(), kubernetesJobManagerParameters.hasLogback(), @@ -67,7 +68,7 @@ public class JavaCmdJobManagerDecorator extends AbstractKubernetesStepDecorator * Generates the shell command to start a jobmanager for kubernetes. * * @param flinkConfig The Flink configuration. - * @param jobManagerMemoryMb JobManager heap size. + * @param jobManagerProcessSpec JobManager process memory spec. * @param configDirectory The configuration directory for the flink-conf.yaml * @param logDirectory The log directory. * @param hasLogback Uses logback? @@ -77,14 +78,13 @@ public class JavaCmdJobManagerDecorator extends AbstractKubernetesStepDecorator */ private static String getJobManagerStartCommand( Configuration flinkConfig, - int jobManagerMemoryMb, + ProcessMemorySpec jobManagerProcessSpec, String configDirectory, String logDirectory, boolean hasLogback, boolean hasLog4j, String mainClass) { - final int heapSize = BootstrapTools.calculateHeapSize(jobManagerMemoryMb, flinkConfig); - final String jvmMemOpts = String.format("-Xms%sm -Xmx%sm", heapSize, heapSize); + final String jvmMemOpts = ProcessMemoryUtils.generateJvmParametersStr(jobManagerProcessSpec); return KubernetesUtils.getCommonStartCommand( flinkConfig, KubernetesUtils.ClusterComponent.JOB_MANAGER, diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java index 6540cdb..d4e81d1 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java @@ -30,6 +30,7 @@ import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal import org.apache.flink.kubernetes.utils.Constants; import org.apache.flink.kubernetes.utils.KubernetesUtils; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec; import java.util.Collections; import java.util.HashMap; @@ -89,6 +90,10 @@ public class KubernetesJobManagerParameters extends AbstractKubernetesParameters return JOB_MANAGER_MAIN_CONTAINER_NAME; } + public JobManagerProcessSpec getJobManagerProcessSpec() { + return clusterSpecification.getMasterProcessSpec(); + } + public int getJobManagerMemoryMB() { return clusterSpecification.getMasterMemoryMB(); } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java index a7bf478..df7fe6a 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java @@ -84,7 +84,7 @@ public class KubernetesSessionCliTest { "-e", KubernetesSessionClusterExecutor.NAME, "-D" + TaskManagerOptions.NUM_TASK_SLOTS.key() + "=3"}; - final KubernetesSessionCli cli = createFlinkKubernetesCustomCliWithTmTotalMemory(1234); + final KubernetesSessionCli cli = createFlinkKubernetesCustomCliWithJmAndTmTotalMemory(1234); final Configuration executorConfig = cli.getEffectiveConfiguration(params); final ClusterClientFactory<String> clientFactory = getClusterClientFactory(executorConfig); @@ -96,7 +96,7 @@ public class KubernetesSessionCliTest { @Test public void testResumeFromKubernetesID() throws Exception { - final KubernetesSessionCli cli = createFlinkKubernetesCustomCliWithTmTotalMemory(1024); + final KubernetesSessionCli cli = createFlinkKubernetesCustomCliWithJmAndTmTotalMemory(1024); final String clusterId = "my-test-CLUSTER_ID"; final String[] args = new String[] { @@ -120,13 +120,13 @@ public class KubernetesSessionCliTest { final int taskManagerMemory = 7331; final int slotsPerTaskManager = 30; - configuration.set(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, MemorySize.ofMebiBytes(jobManagerMemory)); + configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(jobManagerMemory)); configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(taskManagerMemory)); configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, slotsPerTaskManager); final String[] args = { "-e", KubernetesSessionClusterExecutor.NAME, - "-D" + JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key() + "=" + jobManagerMemory + "m", + "-D" + JobManagerOptions.TOTAL_PROCESS_MEMORY.key() + "=" + jobManagerMemory + "m", "-D" + TaskManagerOptions.TOTAL_PROCESS_MEMORY.key() + "=" + taskManagerMemory + "m", "-D" + TaskManagerOptions.NUM_TASK_SLOTS.key() + "=" + slotsPerTaskManager }; @@ -150,7 +150,7 @@ public class KubernetesSessionCliTest { public void testConfigurationClusterSpecification() throws Exception { final Configuration configuration = new Configuration(); final int jobManagerMemory = 1337; - configuration.set(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, MemorySize.ofMebiBytes(jobManagerMemory)); + configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(jobManagerMemory)); final int taskManagerMemory = 7331; configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(taskManagerMemory)); final int slotsPerTaskManager = 42; @@ -175,11 +175,11 @@ public class KubernetesSessionCliTest { public void testHeapMemoryPropertyWithUnitMB() throws Exception { final String[] args = new String[] { "-e", KubernetesSessionClusterExecutor.NAME, - "-D" + JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key() + "=1024m", + "-D" + JobManagerOptions.TOTAL_PROCESS_MEMORY.key() + "=1024m", "-D" + TaskManagerOptions.TOTAL_PROCESS_MEMORY.key() + "=2048m" }; - final KubernetesSessionCli cli = createFlinkKubernetesCustomCliWithTmTotalMemory(1024); + final KubernetesSessionCli cli = createFlinkKubernetesCustomCliWithJmAndTmTotalMemory(1024); final Configuration executorConfig = cli.getEffectiveConfiguration(args); final ClusterClientFactory<String> clientFactory = getClusterClientFactory(executorConfig); @@ -196,11 +196,11 @@ public class KubernetesSessionCliTest { public void testHeapMemoryPropertyWithArbitraryUnit() throws Exception { final String[] args = new String[] { "-e", KubernetesSessionClusterExecutor.NAME, - "-D" + JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key() + "=1g", + "-D" + JobManagerOptions.TOTAL_PROCESS_MEMORY.key() + "=1g", "-D" + TaskManagerOptions.TOTAL_PROCESS_MEMORY.key() + "=3g" }; - final KubernetesSessionCli cli = createFlinkKubernetesCustomCliWithTmTotalMemory(1024); + final KubernetesSessionCli cli = createFlinkKubernetesCustomCliWithJmAndTmTotalMemory(1024); final Configuration executorConfig = cli.getEffectiveConfiguration(args); final ClusterClientFactory<String> clientFactory = getClusterClientFactory(executorConfig); @@ -239,7 +239,7 @@ public class KubernetesSessionCliTest { "-e", KubernetesSessionClusterExecutor.NAME }; - final KubernetesSessionCli cli = createFlinkKubernetesCustomCliWithTmTotalMemory(1024); + final KubernetesSessionCli cli = createFlinkKubernetesCustomCliWithJmAndTmTotalMemory(1024); final Configuration executorConfig = cli.getEffectiveConfiguration(args); final ClusterClientFactory<String> clientFactory = getClusterClientFactory(executorConfig); @@ -254,8 +254,9 @@ public class KubernetesSessionCliTest { return clusterClientServiceLoader.getClusterClientFactory(executorConfig); } - private KubernetesSessionCli createFlinkKubernetesCustomCliWithTmTotalMemory(int totalMemory) { + private KubernetesSessionCli createFlinkKubernetesCustomCliWithJmAndTmTotalMemory(int totalMemory) { Configuration configuration = new Configuration(); + configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(totalMemory)); configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(totalMemory)); return new KubernetesSessionCli(configuration); } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesJobManagerTestBase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesJobManagerTestBase.java index c063a6b..3c03f7b 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesJobManagerTestBase.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesJobManagerTestBase.java @@ -26,12 +26,15 @@ import org.apache.flink.configuration.RestOptions; import org.apache.flink.kubernetes.KubernetesTestBase; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters; +import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec; import org.junit.Before; import java.util.HashMap; import java.util.Map; +import static org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.createDefaultJobManagerProcessSpec; + /** * Base test class for the JobManager side. */ @@ -39,6 +42,7 @@ public class KubernetesJobManagerTestBase extends KubernetesTestBase { protected static final double JOB_MANAGER_CPU = 2.0; protected static final int JOB_MANAGER_MEMORY = 768; + protected static final JobManagerProcessSpec JOB_MANAGER_PROCESS_SPEC = createDefaultJobManagerProcessSpec(JOB_MANAGER_MEMORY); protected static final int REST_PORT = 9081; protected static final int RPC_PORT = 7123; @@ -83,7 +87,7 @@ public class KubernetesJobManagerTestBase extends KubernetesTestBase { this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_NODE_SELECTOR, nodeSelector); final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder() - .setMasterMemoryMB(JOB_MANAGER_MEMORY) + .setMasterProcessSpec(JOB_MANAGER_PROCESS_SPEC) .setTaskManagerMemoryMB(1024) .setSlotsPerTaskManager(3) .createClusterSpecification(); diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecoratorTest.java index 36ccd1e..fbf0295 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecoratorTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecoratorTest.java @@ -25,6 +25,7 @@ import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal import org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint; import org.apache.flink.kubernetes.kubeclient.FlinkPod; import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerTestBase; +import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils; import io.fabric8.kubernetes.api.model.Container; import org.junit.Before; @@ -63,8 +64,7 @@ public class JavaCmdJobManagerDecoratorTest extends KubernetesJobManagerTestBase FLINK_LOG_DIR_IN_POD, FLINK_LOG_DIR_IN_POD); // Memory variables - private static final String jmJvmMem = String.format("-Xms%dm -Xmx%dm", - JOB_MANAGER_MEMORY - 600, JOB_MANAGER_MEMORY - 600); + private static final String jmJvmMem = ProcessMemoryUtils.generateJvmParametersStr(JOB_MANAGER_PROCESS_SPEC); private JavaCmdJobManagerDecorator javaCmdJobManagerDecorator; diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java index 00f3bae..91f874b 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java @@ -27,6 +27,8 @@ import org.apache.flink.kubernetes.KubernetesTestBase; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal; import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec; +import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils; import org.apache.flink.util.FlinkRuntimeException; import org.junit.Test; @@ -47,11 +49,11 @@ import static org.junit.Assert.fail; */ public class KubernetesJobManagerParametersTest extends KubernetesTestBase { - private static final int JOB_MANAGER_MEMORY = 768; + private static final JobManagerProcessSpec JOB_MANAGER_PROCESS_SPEC = JobManagerProcessUtils.createDefaultJobManagerProcessSpec(768); private static final double JOB_MANAGER_CPU = 2.0; private final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder() - .setMasterMemoryMB(JOB_MANAGER_MEMORY) + .setMasterProcessSpec(JOB_MANAGER_PROCESS_SPEC) .setTaskManagerMemoryMB(1024) .setSlotsPerTaskManager(1) .createClusterSpecification(); @@ -93,7 +95,7 @@ public class KubernetesJobManagerParametersTest extends KubernetesTestBase { @Test public void testGetJobManagerMemoryMB() { - assertEquals(JOB_MANAGER_MEMORY, kubernetesJobManagerParameters.getJobManagerMemoryMB()); + assertThat(kubernetesJobManagerParameters.getJobManagerProcessSpec(), is(JOB_MANAGER_PROCESS_SPEC)); } @Test
