gyfora commented on code in PR #633:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/633#discussion_r1266827140
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##########
@@ -332,29 +321,55 @@ private boolean checkNewSpecAlreadyDeployed(CR resource,
Configuration deployCon
return false;
}
+ /**
+ * If there are any parallelism overrides by the {@link JobAutoScaler}
apply them to the spec.
+ *
+ * @param autoscalerOverrides Parallelism overrides initiated by the
autoscaler
+ * @param spec Current user spec
+ */
+ private void applyAutoscalerParallelismOverrides(
+ Map<String, String> autoscalerOverrides, SPEC spec) {
+
+ if (autoscalerOverrides.isEmpty()) {
+ return;
+ }
+
+ LOG.debug("Applying autoscaler parallelism overrides: {}",
autoscalerOverrides);
+
+ var configMap = spec.getFlinkConfiguration();
+ var userOverridesStr =
+
configMap.getOrDefault(PipelineOptions.PARALLELISM_OVERRIDES.key(), "");
+ var userOverrides =
+ new HashMap<>(
+ ConfigurationUtils.<Map<String, String>>convertValue(
+ userOverridesStr, Map.class));
+
+ autoscalerOverrides.forEach(userOverrides::put);
Review Comment:
The user might use the parallelism override configs to define other vertex
parallelisms (or the defaults for the jobgraph). It is also better not to
replace it just in case the jobgraph changes and the user defined new overrides
based on the new vertexes that the autoscaler is not yet aware of.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]