This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 83f92bf3 [FLINK-31871] Interpret Flink MemoryUnits according to the 
actual user input
83f92bf3 is described below

commit 83f92bf3e062410c0e23586580df21e646e297b7
Author: srpraneeth <[email protected]>
AuthorDate: Tue Sep 19 22:56:32 2023 -0700

    [FLINK-31871] Interpret Flink MemoryUnits according to the actual user input
---
 e2e-tests/data/flinkdep-cr.yaml                    |   2 +-
 e2e-tests/data/sessionjob-cr.yaml                  |   2 +-
 .../operator/config/FlinkConfigBuilder.java        |  25 ++-
 .../operator/validation/DefaultValidator.java      |   4 +-
 .../operator/config/FlinkConfigBuilderTest.java    | 222 ++++++++++++++++++++-
 .../operator/validation/DefaultValidatorTest.java  |   4 +
 6 files changed, 253 insertions(+), 6 deletions(-)

diff --git a/e2e-tests/data/flinkdep-cr.yaml b/e2e-tests/data/flinkdep-cr.yaml
index 05048c13..bbd963f2 100644
--- a/e2e-tests/data/flinkdep-cr.yaml
+++ b/e2e-tests/data/flinkdep-cr.yaml
@@ -72,7 +72,7 @@ spec:
       cpu: 0.5
   taskManager:
     resource:
-      memory: "1024m"
+      memory: "1Gi"
       cpu: 0.5
   job:
     jarURI: local:///opt/flink/usrlib/myjob.jar
diff --git a/e2e-tests/data/sessionjob-cr.yaml 
b/e2e-tests/data/sessionjob-cr.yaml
index ff469ab2..36d41adf 100644
--- a/e2e-tests/data/sessionjob-cr.yaml
+++ b/e2e-tests/data/sessionjob-cr.yaml
@@ -55,7 +55,7 @@ spec:
             claimName: session-cluster-1-pvc
   jobManager:
     resource:
-      memory: "1024m"
+      memory: "1Gi"
       cpu: 0.5
   taskManager:
     resource:
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
index 0f1dcfc9..82ce46d0 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.DeploymentOptionsInternal;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
@@ -420,13 +421,35 @@ public class FlinkConfigBuilder {
                             ? JobManagerOptions.TOTAL_PROCESS_MEMORY
                             : TaskManagerOptions.TOTAL_PROCESS_MEMORY;
             if (resource.getMemory() != null) {
-                effectiveConfig.setString(memoryConfigOption.key(), 
resource.getMemory());
+                effectiveConfig.setString(
+                        memoryConfigOption.key(), 
parseResourceMemoryString(resource.getMemory()));
             }
 
             configureCpu(resource, effectiveConfig, isJM);
         }
     }
 
+    // Using the K8s units specification for the JM and TM memory settings
+    public static String parseResourceMemoryString(String memory) {
+        try {
+            return MemorySize.parse(memory).toString();
+        } catch (IllegalArgumentException e) {
+            var memoryQuantity = formatMemoryStringForK8sSpec(memory);
+            return Quantity.parse(memoryQuantity).getNumericalAmount() + "";
+        }
+    }
+
+    private static String formatMemoryStringForK8sSpec(String memory) {
+        var memoryQuantity = memory.trim().replaceAll("\\s", "").toUpperCase();
+        if (memoryQuantity.endsWith("B")) {
+            memoryQuantity = memoryQuantity.substring(0, 
memoryQuantity.length() - 1);
+        }
+        if (memoryQuantity.endsWith("I")) {
+            memoryQuantity = memoryQuantity.substring(0, 
memoryQuantity.length() - 1) + "i";
+        }
+        return memoryQuantity;
+    }
+
     private void configureCpu(Resource resource, Configuration conf, boolean 
isJM) {
         if (resource.getCpu() == null) {
             return;
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index 3b087a44..bce7680f 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -36,6 +36,7 @@ import org.apache.flink.kubernetes.operator.api.spec.Resource;
 import org.apache.flink.kubernetes.operator.api.spec.TaskManagerSpec;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
 import 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
@@ -367,7 +368,8 @@ public class DefaultValidator implements 
FlinkResourceValidator {
 
         if (memory != null) {
             try {
-                MemorySize.parse(memory);
+                MemorySize.parse(
+                        
FlinkConfigBuilder.parseResourceMemoryString(resource.getMemory()));
             } catch (IllegalArgumentException iae) {
                 builder.append(component + " resource memory parse error: " + 
iae.getMessage());
             }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
index a2f14a36..14bbaf9e 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
@@ -429,7 +429,7 @@ public class FlinkConfigBuilderTest {
                         .build();
 
         assertEquals(
-                MemorySize.parse("2048m"),
+                MemorySize.parse("2 gb"),
                 configuration.get(JobManagerOptions.TOTAL_PROCESS_MEMORY));
         assertEquals(Double.valueOf(1), 
configuration.get(KubernetesConfigOptions.JOB_MANAGER_CPU));
         assertEquals(
@@ -472,12 +472,230 @@ public class FlinkConfigBuilderTest {
                         .build();
 
         assertEquals(
-                MemorySize.parse("2048m"),
+                MemorySize.parse("2 gb"),
                 configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
         assertEquals(
                 Double.valueOf(1), 
configuration.get(KubernetesConfigOptions.TASK_MANAGER_CPU));
     }
 
+    @Test
+    public void testApplyJobManagerSpecWithBiByteMemorySetting() {
+        var resource = new Resource(1.0, "1Gi", "20Gi");
+        flinkDeployment.getSpec().getJobManager().setResource(resource);
+        Configuration configuration =
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyJobManagerSpec()
+                        .build();
+        assertEquals(
+                MemorySize.parse("1 gb"),
+                configuration.get(JobManagerOptions.TOTAL_PROCESS_MEMORY));
+    }
+
+    @Test
+    public void testTaskManagerSpecWith2048MiSetting() {
+        var resource = new Resource(1.0, "2048Mi", "20Gi");
+        flinkDeployment.getSpec().getTaskManager().setResource(resource);
+        Configuration configuration =
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyTaskManagerSpec()
+                        .build();
+        assertEquals(
+                MemorySize.parse("2147483648 b"),
+                configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+    }
+
+    @Test
+    public void testTaskManagerSpecWith2gSetting() {
+        
flinkDeployment.getSpec().getTaskManager().getResource().setMemory("2g");
+        Configuration configuration =
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyTaskManagerSpec()
+                        .build();
+        assertEquals(
+                MemorySize.parse("2147483648 b"),
+                configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+    }
+
+    @Test
+    public void testTaskManagerSpecWith2giSetting() {
+        
flinkDeployment.getSpec().getTaskManager().getResource().setMemory("2gi");
+        Configuration configuration =
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyTaskManagerSpec()
+                        .build();
+        assertEquals(
+                MemorySize.parse("2147483648 b"),
+                configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+    }
+
+    @Test
+    public void testTaskManagerSpecWith2gbSetting() {
+        
flinkDeployment.getSpec().getTaskManager().getResource().setMemory("2gb");
+        Configuration configuration =
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyTaskManagerSpec()
+                        .build();
+        assertEquals(
+                MemorySize.parse("2 gb"),
+                configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+    }
+
+    @Test
+    public void testTaskManagerSpecWith2gibSetting() {
+        
flinkDeployment.getSpec().getTaskManager().getResource().setMemory("2gib");
+        Configuration configuration =
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyTaskManagerSpec()
+                        .build();
+        assertEquals(
+                MemorySize.parse("2 gb"),
+                configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+    }
+
+    @Test
+    public void testTaskManagerSpecWith2GSetting() {
+        
flinkDeployment.getSpec().getTaskManager().getResource().setMemory("2G");
+        Configuration configuration =
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyTaskManagerSpec()
+                        .build();
+        assertEquals(
+                MemorySize.parse("2 gb"),
+                configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+    }
+
+    @Test
+    public void testTaskManagerSpecWith2GiSetting() {
+        
flinkDeployment.getSpec().getTaskManager().getResource().setMemory("2Gi");
+        Configuration configuration =
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyTaskManagerSpec()
+                        .build();
+        assertEquals(
+                MemorySize.parse("2147483648 b"),
+                configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+    }
+
+    @Test
+    public void testTaskManagerSpecWith2_gSetting() {
+        flinkDeployment.getSpec().getTaskManager().getResource().setMemory("2 
g");
+        Configuration configuration =
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyTaskManagerSpec()
+                        .build();
+        assertEquals(
+                MemorySize.parse("2 gb"),
+                configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+    }
+
+    @Test
+    public void testTaskManagerSpecWith2_GiSetting() {
+        flinkDeployment.getSpec().getTaskManager().getResource().setMemory("2 
Gi");
+        Configuration configuration =
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyTaskManagerSpec()
+                        .build();
+        assertEquals(
+                MemorySize.parse("2147483648 b"),
+                configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+    }
+
+    @Test
+    public void testTaskManagerSpecWith512mSetting() {
+        
flinkDeployment.getSpec().getTaskManager().getResource().setMemory("512m");
+        Configuration configuration =
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyTaskManagerSpec()
+                        .build();
+        assertEquals(
+                MemorySize.parse("512 mb"),
+                configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+    }
+
+    @Test
+    public void testTaskManagerSpecWith512miSetting() {
+        
flinkDeployment.getSpec().getTaskManager().getResource().setMemory("512mi");
+        Configuration configuration =
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyTaskManagerSpec()
+                        .build();
+        assertEquals(
+                MemorySize.parse("536870912 b"),
+                configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+    }
+
+    @Test
+    public void testTaskManagerSpecWith1024mSetting() {
+        
flinkDeployment.getSpec().getTaskManager().getResource().setMemory("1024m");
+        Configuration configuration =
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyTaskManagerSpec()
+                        .build();
+        assertEquals(
+                MemorySize.parse("1 gb"),
+                configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+    }
+
+    @Test
+    public void testTaskManagerSpecWith1024miSetting() {
+        
flinkDeployment.getSpec().getTaskManager().getResource().setMemory("1024mi");
+        Configuration configuration =
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyTaskManagerSpec()
+                        .build();
+        assertEquals(
+                MemorySize.parse("1073741824 b"),
+                configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+    }
+
+    @Test
+    public void testTaskManagerSpecWith1000000bSetting() {
+        
flinkDeployment.getSpec().getTaskManager().getResource().setMemory("1000000b");
+        Configuration configuration =
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyTaskManagerSpec()
+                        .build();
+        assertEquals(
+                MemorySize.parse("1000000 b"),
+                configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+    }
+
+    @Test
+    public void testTaskManagerSpecWith1000000_bSetting() {
+        
flinkDeployment.getSpec().getTaskManager().getResource().setMemory("1000000 b");
+        Configuration configuration =
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyTaskManagerSpec()
+                        .build();
+        assertEquals(
+                MemorySize.parse("1000000 b"),
+                configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+    }
+
+    @Test
+    public void testTaskManagerSpecWith1e6Setting() {
+        
flinkDeployment.getSpec().getTaskManager().getResource().setMemory("1e6");
+        Configuration configuration =
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyTaskManagerSpec()
+                        .build();
+        assertEquals(
+                MemorySize.parse("1000000 b"),
+                configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+    }
+
+    @Test
+    public void testTaskManagerSpecWith1e6_bSetting() {
+        
flinkDeployment.getSpec().getTaskManager().getResource().setMemory("1e6 b");
+        Configuration configuration =
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyTaskManagerSpec()
+                        .build();
+        assertEquals(
+                MemorySize.parse("1000000 b"),
+                configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+    }
+
     @Test
     public void testTmEphemeralStorage() throws Exception {
         FlinkDeployment deploymentClone = 
ReconciliationUtils.clone(flinkDeployment);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index 865a51a0..506f15d3 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -283,6 +283,10 @@ public class DefaultValidatorTest {
         testSuccess(dep -> 
dep.getSpec().getTaskManager().getResource().setMemory("1G"));
         testSuccess(dep -> 
dep.getSpec().getTaskManager().getResource().setMemory("100"));
 
+        // Test resource validation with k8s specification
+        testSuccess(dep -> 
dep.getSpec().getJobManager().getResource().setMemory("1Gi"));
+        testSuccess(dep -> 
dep.getSpec().getTaskManager().getResource().setMemory("1Gi"));
+
         testError(
                 dep -> 
dep.getSpec().getTaskManager().getResource().setMemory("invalid"),
                 "TaskManager resource memory parse error");

Reply via email to