Dennis-Mircea commented on code in PR #1082:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/1082#discussion_r3436676045
##########
flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobManagerSpec.java:
##########
@@ -36,8 +37,19 @@
@Builder
@JsonIgnoreProperties(ignoreUnknown = true)
public class JobManagerSpec {
- /** Resource specification for the JobManager pods. */
- private Resource resource;
+ /**
+ * Resource specification for the JobManager pods.
+ *
+ * @deprecated Use resourceRequirements instead for proper Kubernetes
ResourceRequirements
Review Comment:
This @deprecated comment is ambiguous and not well formatted.
##########
flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/TaskManagerSpec.java:
##########
@@ -40,8 +41,18 @@
@Builder
@JsonIgnoreProperties(ignoreUnknown = true)
public class TaskManagerSpec implements Diffable<TaskManagerSpec> {
- /** Resource specification for the TaskManager pods. */
- private Resource resource;
+ /**
+ * Resource specification for the TaskManager pods.
+ *
+ * @deprecated Use resources instead for proper Kubernetes
ResourceRequirements schema.
Review Comment:
This @deprecated comment is ambiguous.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -196,6 +198,61 @@ protected FlinkConfigBuilder applyLogConfiguration()
throws IOException {
return this;
}
+ @VisibleForTesting
+ protected static PodTemplateSpec applyResourceRequirementsToPodTemplate(
+ PodTemplateSpec podTemplate, ResourceRequirements requirements) {
+ if (requirements == null
+ || (requirements.getRequests().isEmpty() &&
requirements.getLimits().isEmpty())) {
+ return podTemplate;
+ }
+
+ PodTemplateSpec outPodTemplate =
+ (podTemplate == null)
+ ? new PodTemplateSpec()
+ : ReconciliationUtils.clone(podTemplate);
+
+ if (outPodTemplate.getSpec() == null) {
+ outPodTemplate.setSpec(new PodSpec());
+ }
+ PodSpec podSpec = outPodTemplate.getSpec();
+
+ Optional<Container> mainContainerOpt =
+ podSpec.getContainers().stream()
+ .filter(c ->
c.getName().equals(Constants.MAIN_CONTAINER_NAME))
+ .findFirst();
+
+ Container mainContainer;
+ if (mainContainerOpt.isPresent()) {
+ mainContainer = mainContainerOpt.get();
+ } else {
+ mainContainer = new Container();
+ mainContainer.setName(Constants.MAIN_CONTAINER_NAME);
+ podSpec.getContainers().add(mainContainer);
+ }
+
+ if (mainContainer.getResources() == null) {
+ mainContainer.setResources(new
io.fabric8.kubernetes.api.model.ResourceRequirements());
Review Comment:
Why to use here full package naming?
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -196,6 +198,61 @@ protected FlinkConfigBuilder applyLogConfiguration()
throws IOException {
return this;
}
+ @VisibleForTesting
+ protected static PodTemplateSpec applyResourceRequirementsToPodTemplate(
+ PodTemplateSpec podTemplate, ResourceRequirements requirements) {
+ if (requirements == null
+ || (requirements.getRequests().isEmpty() &&
requirements.getLimits().isEmpty())) {
Review Comment:
Here you are not checking `requirements.getRequests() != null` in order to
have symmetry with `setResourceRequirements` check.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -269,9 +348,89 @@ protected FlinkConfigBuilder applyJobManagerSpec() {
return this;
}
+ /**
+ * Sets Flink configuration from Kubernetes ResourceRequirements, handling
both requests (for
+ * Flink config values) and limits (for computing limit factors).
+ *
+ * @param resourceRequirements the Kubernetes resource requirements
+ * @param effectiveConfig the Flink configuration to populate
+ * @param isJM true for JobManager, false for TaskManager
+ */
+ private void setResourceRequirements(
+ ResourceRequirements resourceRequirements,
+ Configuration effectiveConfig,
+ boolean isJM) {
+ if (resourceRequirements == null) {
+ return;
+ }
+
+ Map<String, Quantity> requests = resourceRequirements.getRequests();
+ Map<String, Quantity> limits = resourceRequirements.getLimits();
+
+ if ((requests == null || requests.isEmpty()) && (limits == null ||
limits.isEmpty())) {
+ return;
+ }
+
+ var memoryConfigOption =
+ isJM
+ ? JobManagerOptions.TOTAL_PROCESS_MEMORY
+ : TaskManagerOptions.TOTAL_PROCESS_MEMORY;
+
+ // Handle memory from requests
+ if (requests != null && requests.containsKey("memory")) {
+ String memoryValue = requests.get("memory").toString();
+ effectiveConfig.setString(
+ memoryConfigOption.key(),
parseResourceMemoryString(memoryValue));
+ }
+
+ // Handle CPU from requests
+ if (requests != null && requests.containsKey("cpu")) {
+ String cpuValue = requests.get("cpu").toString();
+ configureCpuFromString(cpuValue, effectiveConfig, isJM);
+ }
+
+ // Handle CPU limit factor: limits.cpu / requests.cpu
+ if (limits != null
Review Comment:
With the current implementation if only `limits` is set without `requests`
the implementation doesn't do anything. Kubernetes itself defaults requests =
limits when requests are omitted.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -480,6 +639,49 @@ private void configureCpu(Resource resource, Configuration
conf, boolean isJM) {
}
}
+ private void configureCpuFromString(String cpuValue, Configuration conf,
boolean isJM) {
+ if (StringUtils.isNullOrWhitespaceOnly(cpuValue)) {
+ return;
+ }
+
+ try {
+ double cpuDouble = parseCpuValue(cpuValue);
+ configureCpu(cpuDouble, conf, isJM);
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Could not parse CPU value '{}'.", cpuValue, e);
+ }
+ }
+
+ private void configureCpu(Double cpu, Configuration conf, boolean isJM) {
+ if (cpu == null) {
+ return;
+ }
+
+ ConfigOption<Double> cpuConfigOption =
+ isJM
+ ? KubernetesConfigOptions.JOB_MANAGER_CPU
+ : KubernetesConfigOptions.TASK_MANAGER_CPU;
+ conf.set(cpuConfigOption, cpu);
+
+ if (!spec.getFlinkVersion().isEqualOrNewer(FlinkVersion.v1_17)) {
+ String legacyKey = isJM ? "kubernetes.jobmanager.cpu" :
"kubernetes.taskmanager.cpu";
+ conf.setDouble(legacyKey, cpu);
+ }
+ }
+
+ private static double parseCpuValue(String cpuValue) {
Review Comment:
In the DefaultValidator you using already `Quantity.parse` for CPU. Why not
to use it also here?
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -207,8 +264,16 @@ protected FlinkConfigBuilder applyPodTemplate() throws
IOException {
mergePodTemplates(
commonPodTemplate,
spec.getJobManager().getPodTemplate(), mergeByName);
- jmPodTemplate =
- applyResourceToPodTemplate(jmPodTemplate,
spec.getJobManager().getResource());
+ // Prioritize new ResourceRequirements, then fall back to old
Resource
Review Comment:
The comment is wrong, the if-else branch is not equal with a "fall back"
mechanism.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -232,8 +297,17 @@ protected FlinkConfigBuilder applyPodTemplate() throws
IOException {
tmPodTemplate =
mergePodTemplates(
commonPodTemplate,
spec.getTaskManager().getPodTemplate(), mergeByName);
- tmPodTemplate =
- applyResourceToPodTemplate(tmPodTemplate,
spec.getTaskManager().getResource());
+
+ // Prioritize new ResourceRequirements, then fall back to old
Resource
Review Comment:
The comment is wrong, the if-else branch is not equal with a "fall back"
mechanism.
##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java:
##########
@@ -561,6 +565,162 @@ public void testTaskManagerSpecWith2_GiSetting() {
configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
}
+ @Test
+ public void testTaskManagerSpecWith2_GiResourceRequirementsSetting() {
+ Map<String, Quantity> requests = new HashMap<>();
+ requests.put("memory", new Quantity("2Gi"));
+ ResourceRequirements resourceRequirements =
+ new
ResourceRequirementsBuilder().withRequests(requests).build();
+
flinkDeployment.getSpec().getTaskManager().setResources(resourceRequirements);
+ Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment, new Configuration())
+ .applyTaskManagerSpec()
+ .build();
+ assertEquals(
+ MemorySize.parse("2147483648 b"),
+ configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+ }
+
+ @Test
+ public void testApplyJobManagerSpecWithResourceRequirements() {
+ Map<String, Quantity> requests = new HashMap<>();
+ requests.put("memory", new Quantity("4Gi"));
+ requests.put("cpu", new Quantity("2"));
+ ResourceRequirements rr = new
ResourceRequirementsBuilder().withRequests(requests).build();
+ flinkDeployment.getSpec().getJobManager().setResources(rr);
+ flinkDeployment.getSpec().getJobManager().setResource(null);
+ Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment, new Configuration())
+ .applyJobManagerSpec()
+ .build();
+ assertEquals(
+ MemorySize.parse("4294967296 b"),
+ configuration.get(JobManagerOptions.TOTAL_PROCESS_MEMORY));
+ assertEquals(
+ Double.valueOf(2.0),
configuration.get(KubernetesConfigOptions.JOB_MANAGER_CPU));
+ }
+
+ @Test
+ public void testApplyTaskManagerSpecWithCpuResourceRequirements() {
+ Map<String, Quantity> requests = new HashMap<>();
+ requests.put("memory", new Quantity("2Gi"));
+ requests.put("cpu", new Quantity("500m"));
+ ResourceRequirements rr = new
ResourceRequirementsBuilder().withRequests(requests).build();
+ flinkDeployment.getSpec().getTaskManager().setResources(rr);
+ flinkDeployment.getSpec().getTaskManager().setResource(null);
+ Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment, new Configuration())
+ .applyTaskManagerSpec()
+ .build();
+ assertEquals(
+ Double.valueOf(0.5),
configuration.get(KubernetesConfigOptions.TASK_MANAGER_CPU));
+ }
+
+ @Test
+ public void testResourceRequirementsCpuLimitFactor() {
+ Map<String, Quantity> requests = new HashMap<>();
+ requests.put("memory", new Quantity("2Gi"));
+ requests.put("cpu", new Quantity("1"));
+ Map<String, Quantity> limits = new HashMap<>();
+ limits.put("cpu", new Quantity("2"));
+ ResourceRequirements rr =
+ new
ResourceRequirementsBuilder().withRequests(requests).withLimits(limits).build();
+ flinkDeployment.getSpec().getJobManager().setResources(rr);
+ flinkDeployment.getSpec().getJobManager().setResource(null);
+ Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment, new Configuration())
+ .applyJobManagerSpec()
+ .build();
+ assertEquals(
+ Double.valueOf(2.0),
+
configuration.get(KubernetesConfigOptions.JOB_MANAGER_CPU_LIMIT_FACTOR));
+ }
+
+ @Test
+ public void testResourceRequirementsMemoryLimitFactor() {
+ Map<String, Quantity> requests = new HashMap<>();
+ requests.put("memory", new Quantity("2Gi"));
+ requests.put("cpu", new Quantity("1"));
+ Map<String, Quantity> limits = new HashMap<>();
+ limits.put("memory", new Quantity("4Gi"));
+ ResourceRequirements rr =
+ new
ResourceRequirementsBuilder().withRequests(requests).withLimits(limits).build();
+ flinkDeployment.getSpec().getTaskManager().setResources(rr);
+ flinkDeployment.getSpec().getTaskManager().setResource(null);
+ Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment, new Configuration())
+ .applyTaskManagerSpec()
+ .build();
+ assertEquals(
+ Double.valueOf(2.0),
+
configuration.get(KubernetesConfigOptions.TASK_MANAGER_MEMORY_LIMIT_FACTOR));
+ }
+
+ @Test
+ public void testResourceRequirementsTakesPrecedenceOverResource() {
+ flinkDeployment.getSpec().getJobManager().setResource(new
Resource(1.0, "1024m", null));
+ Map<String, Quantity> requests = new HashMap<>();
+ requests.put("memory", new Quantity("4Gi"));
+ requests.put("cpu", new Quantity("2"));
+ flinkDeployment
+ .getSpec()
+ .getJobManager()
+ .setResources(new
ResourceRequirementsBuilder().withRequests(requests).build());
+ Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment, new Configuration())
+ .applyJobManagerSpec()
+ .build();
+ assertEquals(
+ MemorySize.parse("4294967296 b"),
+ configuration.get(JobManagerOptions.TOTAL_PROCESS_MEMORY));
+ assertEquals(
+ Double.valueOf(2.0),
configuration.get(KubernetesConfigOptions.JOB_MANAGER_CPU));
+ }
+
+ @Test
+ public void testApplyResourceRequirementsToPodTemplate() {
Review Comment:
It will be good to have a UT testing mixed units (e.g. `1Gi` request vs
`2048Mi` limit, and `500m` vs `1` for CPU).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]