This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 83f92bf3 [FLINK-31871] Interpret Flink MemoryUnits according to the
actual user input
83f92bf3 is described below
commit 83f92bf3e062410c0e23586580df21e646e297b7
Author: srpraneeth <[email protected]>
AuthorDate: Tue Sep 19 22:56:32 2023 -0700
[FLINK-31871] Interpret Flink MemoryUnits according to the actual user input
---
e2e-tests/data/flinkdep-cr.yaml | 2 +-
e2e-tests/data/sessionjob-cr.yaml | 2 +-
.../operator/config/FlinkConfigBuilder.java | 25 ++-
.../operator/validation/DefaultValidator.java | 4 +-
.../operator/config/FlinkConfigBuilderTest.java | 222 ++++++++++++++++++++-
.../operator/validation/DefaultValidatorTest.java | 4 +
6 files changed, 253 insertions(+), 6 deletions(-)
diff --git a/e2e-tests/data/flinkdep-cr.yaml b/e2e-tests/data/flinkdep-cr.yaml
index 05048c13..bbd963f2 100644
--- a/e2e-tests/data/flinkdep-cr.yaml
+++ b/e2e-tests/data/flinkdep-cr.yaml
@@ -72,7 +72,7 @@ spec:
cpu: 0.5
taskManager:
resource:
- memory: "1024m"
+ memory: "1Gi"
cpu: 0.5
job:
jarURI: local:///opt/flink/usrlib/myjob.jar
diff --git a/e2e-tests/data/sessionjob-cr.yaml
b/e2e-tests/data/sessionjob-cr.yaml
index ff469ab2..36d41adf 100644
--- a/e2e-tests/data/sessionjob-cr.yaml
+++ b/e2e-tests/data/sessionjob-cr.yaml
@@ -55,7 +55,7 @@ spec:
claimName: session-cluster-1-pvc
jobManager:
resource:
- memory: "1024m"
+ memory: "1Gi"
cpu: 0.5
taskManager:
resource:
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
index 0f1dcfc9..82ce46d0 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
@@ -420,13 +421,35 @@ public class FlinkConfigBuilder {
? JobManagerOptions.TOTAL_PROCESS_MEMORY
: TaskManagerOptions.TOTAL_PROCESS_MEMORY;
if (resource.getMemory() != null) {
- effectiveConfig.setString(memoryConfigOption.key(),
resource.getMemory());
+ effectiveConfig.setString(
+ memoryConfigOption.key(),
parseResourceMemoryString(resource.getMemory()));
}
configureCpu(resource, effectiveConfig, isJM);
}
}
+ // Using the K8s units specification for the JM and TM memory settings
+ public static String parseResourceMemoryString(String memory) {
+ try {
+ return MemorySize.parse(memory).toString();
+ } catch (IllegalArgumentException e) {
+ var memoryQuantity = formatMemoryStringForK8sSpec(memory);
+ return Quantity.parse(memoryQuantity).getNumericalAmount() + "";
+ }
+ }
+
+ private static String formatMemoryStringForK8sSpec(String memory) {
+ var memoryQuantity = memory.trim().replaceAll("\\s", "").toUpperCase();
+ if (memoryQuantity.endsWith("B")) {
+ memoryQuantity = memoryQuantity.substring(0,
memoryQuantity.length() - 1);
+ }
+ if (memoryQuantity.endsWith("I")) {
+ memoryQuantity = memoryQuantity.substring(0,
memoryQuantity.length() - 1) + "i";
+ }
+ return memoryQuantity;
+ }
+
private void configureCpu(Resource resource, Configuration conf, boolean
isJM) {
if (resource.getCpu() == null) {
return;
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index 3b087a44..bce7680f 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -36,6 +36,7 @@ import org.apache.flink.kubernetes.operator.api.spec.Resource;
import org.apache.flink.kubernetes.operator.api.spec.TaskManagerSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
@@ -367,7 +368,8 @@ public class DefaultValidator implements
FlinkResourceValidator {
if (memory != null) {
try {
- MemorySize.parse(memory);
+ MemorySize.parse(
+
FlinkConfigBuilder.parseResourceMemoryString(resource.getMemory()));
} catch (IllegalArgumentException iae) {
builder.append(component + " resource memory parse error: " +
iae.getMessage());
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
index a2f14a36..14bbaf9e 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
@@ -429,7 +429,7 @@ public class FlinkConfigBuilderTest {
.build();
assertEquals(
- MemorySize.parse("2048m"),
+ MemorySize.parse("2 gb"),
configuration.get(JobManagerOptions.TOTAL_PROCESS_MEMORY));
assertEquals(Double.valueOf(1),
configuration.get(KubernetesConfigOptions.JOB_MANAGER_CPU));
assertEquals(
@@ -472,12 +472,230 @@ public class FlinkConfigBuilderTest {
.build();
assertEquals(
- MemorySize.parse("2048m"),
+ MemorySize.parse("2 gb"),
configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
assertEquals(
Double.valueOf(1),
configuration.get(KubernetesConfigOptions.TASK_MANAGER_CPU));
}
+ @Test
+ public void testApplyJobManagerSpecWithBiByteMemorySetting() {
+ var resource = new Resource(1.0, "1Gi", "20Gi");
+ flinkDeployment.getSpec().getJobManager().setResource(resource);
+ Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment, new Configuration())
+ .applyJobManagerSpec()
+ .build();
+ assertEquals(
+ MemorySize.parse("1 gb"),
+ configuration.get(JobManagerOptions.TOTAL_PROCESS_MEMORY));
+ }
+
+ @Test
+ public void testTaskManagerSpecWith2048MiSetting() {
+ var resource = new Resource(1.0, "2048Mi", "20Gi");
+ flinkDeployment.getSpec().getTaskManager().setResource(resource);
+ Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment, new Configuration())
+ .applyTaskManagerSpec()
+ .build();
+ assertEquals(
+ MemorySize.parse("2147483648 b"),
+ configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+ }
+
+ @Test
+ public void testTaskManagerSpecWith2gSetting() {
+
flinkDeployment.getSpec().getTaskManager().getResource().setMemory("2g");
+ Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment, new Configuration())
+ .applyTaskManagerSpec()
+ .build();
+ assertEquals(
+ MemorySize.parse("2147483648 b"),
+ configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+ }
+
+ @Test
+ public void testTaskManagerSpecWith2giSetting() {
+
flinkDeployment.getSpec().getTaskManager().getResource().setMemory("2gi");
+ Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment, new Configuration())
+ .applyTaskManagerSpec()
+ .build();
+ assertEquals(
+ MemorySize.parse("2147483648 b"),
+ configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+ }
+
+ @Test
+ public void testTaskManagerSpecWith2gbSetting() {
+
flinkDeployment.getSpec().getTaskManager().getResource().setMemory("2gb");
+ Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment, new Configuration())
+ .applyTaskManagerSpec()
+ .build();
+ assertEquals(
+ MemorySize.parse("2 gb"),
+ configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+ }
+
+ @Test
+ public void testTaskManagerSpecWith2gibSetting() {
+
flinkDeployment.getSpec().getTaskManager().getResource().setMemory("2gib");
+ Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment, new Configuration())
+ .applyTaskManagerSpec()
+ .build();
+ assertEquals(
+ MemorySize.parse("2 gb"),
+ configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+ }
+
+ @Test
+ public void testTaskManagerSpecWith2GSetting() {
+
flinkDeployment.getSpec().getTaskManager().getResource().setMemory("2G");
+ Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment, new Configuration())
+ .applyTaskManagerSpec()
+ .build();
+ assertEquals(
+ MemorySize.parse("2 gb"),
+ configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+ }
+
+ @Test
+ public void testTaskManagerSpecWith2GiSetting() {
+
flinkDeployment.getSpec().getTaskManager().getResource().setMemory("2Gi");
+ Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment, new Configuration())
+ .applyTaskManagerSpec()
+ .build();
+ assertEquals(
+ MemorySize.parse("2147483648 b"),
+ configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+ }
+
+ @Test
+ public void testTaskManagerSpecWith2_gSetting() {
+ flinkDeployment.getSpec().getTaskManager().getResource().setMemory("2
g");
+ Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment, new Configuration())
+ .applyTaskManagerSpec()
+ .build();
+ assertEquals(
+ MemorySize.parse("2 gb"),
+ configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+ }
+
+ @Test
+ public void testTaskManagerSpecWith2_GiSetting() {
+ flinkDeployment.getSpec().getTaskManager().getResource().setMemory("2
Gi");
+ Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment, new Configuration())
+ .applyTaskManagerSpec()
+ .build();
+ assertEquals(
+ MemorySize.parse("2147483648 b"),
+ configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+ }
+
+ @Test
+ public void testTaskManagerSpecWith512mSetting() {
+
flinkDeployment.getSpec().getTaskManager().getResource().setMemory("512m");
+ Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment, new Configuration())
+ .applyTaskManagerSpec()
+ .build();
+ assertEquals(
+ MemorySize.parse("512 mb"),
+ configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+ }
+
+ @Test
+ public void testTaskManagerSpecWith512miSetting() {
+
flinkDeployment.getSpec().getTaskManager().getResource().setMemory("512mi");
+ Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment, new Configuration())
+ .applyTaskManagerSpec()
+ .build();
+ assertEquals(
+ MemorySize.parse("536870912 b"),
+ configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+ }
+
+ @Test
+ public void testTaskManagerSpecWith1024mSetting() {
+
flinkDeployment.getSpec().getTaskManager().getResource().setMemory("1024m");
+ Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment, new Configuration())
+ .applyTaskManagerSpec()
+ .build();
+ assertEquals(
+ MemorySize.parse("1 gb"),
+ configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+ }
+
+ @Test
+ public void testTaskManagerSpecWith1024miSetting() {
+
flinkDeployment.getSpec().getTaskManager().getResource().setMemory("1024mi");
+ Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment, new Configuration())
+ .applyTaskManagerSpec()
+ .build();
+ assertEquals(
+ MemorySize.parse("1073741824 b"),
+ configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+ }
+
+ @Test
+ public void testTaskManagerSpecWith1000000bSetting() {
+
flinkDeployment.getSpec().getTaskManager().getResource().setMemory("1000000b");
+ Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment, new Configuration())
+ .applyTaskManagerSpec()
+ .build();
+ assertEquals(
+ MemorySize.parse("1000000 b"),
+ configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+ }
+
+ @Test
+ public void testTaskManagerSpecWith1000000_bSetting() {
+
flinkDeployment.getSpec().getTaskManager().getResource().setMemory("1000000 b");
+ Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment, new Configuration())
+ .applyTaskManagerSpec()
+ .build();
+ assertEquals(
+ MemorySize.parse("1000000 b"),
+ configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+ }
+
+ @Test
+ public void testTaskManagerSpecWith1e6Setting() {
+
flinkDeployment.getSpec().getTaskManager().getResource().setMemory("1e6");
+ Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment, new Configuration())
+ .applyTaskManagerSpec()
+ .build();
+ assertEquals(
+ MemorySize.parse("1000000 b"),
+ configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+ }
+
+ @Test
+ public void testTaskManagerSpecWith1e6_bSetting() {
+
flinkDeployment.getSpec().getTaskManager().getResource().setMemory("1e6 b");
+ Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment, new Configuration())
+ .applyTaskManagerSpec()
+ .build();
+ assertEquals(
+ MemorySize.parse("1000000 b"),
+ configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+ }
+
@Test
public void testTmEphemeralStorage() throws Exception {
FlinkDeployment deploymentClone =
ReconciliationUtils.clone(flinkDeployment);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index 865a51a0..506f15d3 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -283,6 +283,10 @@ public class DefaultValidatorTest {
testSuccess(dep ->
dep.getSpec().getTaskManager().getResource().setMemory("1G"));
testSuccess(dep ->
dep.getSpec().getTaskManager().getResource().setMemory("100"));
+ // Test resource validation with k8s specification
+ testSuccess(dep ->
dep.getSpec().getJobManager().getResource().setMemory("1Gi"));
+ testSuccess(dep ->
dep.getSpec().getTaskManager().getResource().setMemory("1Gi"));
+
testError(
dep ->
dep.getSpec().getTaskManager().getResource().setMemory("invalid"),
"TaskManager resource memory parse error");