gyfora commented on code in PR #23768:
URL: https://github.com/apache/flink/pull/23768#discussion_r1402993805


##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java:
##########
@@ -367,10 +367,24 @@ public static ResourceRequirements 
getResourceRequirements(
             Map<String, ExternalResource> externalResources,
             Map<String, String> externalResourceConfigKeys) {
         final Quantity cpuQuantity = new Quantity(String.valueOf(cpu));
-        final Quantity cpuLimitQuantity = new Quantity(String.valueOf(cpu * 
cpuLimitFactor));
+        final Quantity cpuLimitQuantity =
+                new Quantity(
+                        String.valueOf(
+                                getLimit(
+                                        cpu,
+                                        cpuLimitFactor,
+                                        resourceRequirements,
+                                        Constants.RESOURCE_NAME_CPU)));
         final Quantity memQuantity = new Quantity(mem + 
Constants.RESOURCE_UNIT_MB);

Review Comment:
   It seems to be very inconsistent/wrong to take the value for limit from the 
podTemplate and not for the quantity.
   I think we should make a decision and do it either for both or neither.



##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java:
##########
@@ -399,6 +413,40 @@ public static ResourceRequirements getResourceRequirements(
         return resourceRequirementsBuilder.build();
     }
 
+    /**
+     * Calculates the final limit value for a resource based on the new value, 
limit factor, and
+     * existing resource requirements.
+     *
+     * @param newValue The new value for the resource.
+     * @param limitFactor The limit factor for the resource.
+     * @param resourceRequirements Existing resource requirements.
+     * @param resourceName The name of the resource.
+     * @return The final limit value for the resource.
+     */
+    private static double getLimit(
+            double newValue,
+            double limitFactor,
+            ResourceRequirements resourceRequirements,
+            String resourceName) {
+        Map<String, Quantity> limits = resourceRequirements.getLimits();
+        double limit = newValue * limitFactor;
+        if (limits != null) {
+            Quantity quantity = limits.get(resourceName);
+            if (quantity != null) {
+                try {
+                    return Math.max(Double.parseDouble(quantity.getAmount()), 
limit);

Review Comment:
   We should always log on info level that we are using the limit from the 
podTemplate instead of the config to avoid confusion



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/utils/KubernetesUtilsTest.java:
##########
@@ -143,6 +146,21 @@ void testLoadPodFromTemplateAndCheckInitContainer() {
                 
.isEqualTo(KubernetesPodTemplateTestUtils.createInitContainer());
     }
 
+    @Test
+    void testPodTemplateResourceLimitUtilisation() {
+        final FlinkPod flinkPod =
+                KubernetesUtils.loadPodFromTemplateFile(
+                        flinkKubeClient,
+                        
KubernetesPodTemplateTestUtils.getPodTemplateFileWithResourceLimit(),
+                        
KubernetesPodTemplateTestUtils.TESTING_MAIN_CONTAINER_NAME);

Review Comment:
   We need to have tests for the different scenarios (covered in the logic):
    - limits specified but no memory in them
    - incorrect format in memory limit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to