mateczagany commented on code in PR #762:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/762#discussion_r1477106598
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##########
@@ -106,7 +107,13 @@ public void scale(Context ctx) throws Exception {
} catch (Throwable e) {
onError(ctx, autoscalerMetrics, e);
} finally {
- applyParallelismOverrides(ctx);
+ try {
+ applyParallelismOverrides(ctx);
+ applyConfigOverrides(ctx);
Review Comment:
This seems to be causing some issues when both parallelism and memory
configurations get changed and in-place scaling is enabled. I could not figure
out why, but I'll try to look into it next week.
I see the following error coming from the reconciliation loop:
```
Could not create Kubernetes cluster "test-app-scaling"
...
Message: object is being deleted: deployments.apps \"test-app-scaling\"
already exists.
```
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java:
##########
@@ -43,6 +53,32 @@ public void realize(
getOverrideString(context, parallelismOverrides));
}
+ @Override
+ public void realizeMemoryOverrides(
+ KubernetesJobAutoScalerContext context, Configuration
configOverrides) {
+
+ if (context.getResource() instanceof FlinkDeployment) {
+ var flinkDeployment = ((FlinkDeployment) context.getResource());
+
+
flinkDeployment.getSpec().getFlinkConfiguration().putAll(configOverrides.toMap());
+
+ var totalMemoryOverride =
MemoryTuningUtils.getTotalMemory(configOverrides, context);
+ if (totalMemoryOverride.compareTo(MemorySize.ZERO) > 0) {
+ Resource tmResource =
flinkDeployment.getSpec().getTaskManager().getResource();
+ // Make sure to support the Kubernetes syntax here, which
supports more formats than
+ // Flink's classes.
+ var currentMemory =
+ new MemorySize(
+
(Quantity.parse(tmResource.getMemory()).getNumericalAmount())
Review Comment:
This seems to break current deployments using the `2048m` format, which is
valid in Flink, but in Kubernetes it means 2048 * 0.001 bytes:
https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-memory
Also seems to be breaking the format `2048M` as that means mebibytes in
Flink, but megabytes in Kubernetes.
I think we should either go with Flink memory syntax or add some extra logic
here to make sure that previous deployments using this format won't break here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]