mxm commented on code in PR #633:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/633#discussion_r1266728338
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##########
@@ -332,29 +321,55 @@ private boolean checkNewSpecAlreadyDeployed(CR resource,
Configuration deployCon
return false;
}
+ /**
+ * If there are any parallelism overrides by the {@link JobAutoScaler}
apply them to the spec.
+ *
+ * @param autoscalerOverrides Parallelism overrides initiated by the
autoscaler
+ * @param spec Current user spec
+ */
+ private void applyAutoscalerParallelismOverrides(
+ Map<String, String> autoscalerOverrides, SPEC spec) {
+
+ if (autoscalerOverrides.isEmpty()) {
+ return;
+ }
+
+ LOG.debug("Applying autoscaler parallelism overrides: {}",
autoscalerOverrides);
+
+ var configMap = spec.getFlinkConfiguration();
+ var userOverridesStr =
+
configMap.getOrDefault(PipelineOptions.PARALLELISM_OVERRIDES.key(), "");
+ var userOverrides =
+ new HashMap<>(
+ ConfigurationUtils.<Map<String, String>>convertValue(
+ userOverridesStr, Map.class));
+
+ autoscalerOverrides.forEach(userOverrides::put);
Review Comment:
This is semantically different from the current behavior. Here we apply
additional overrides and the config is no longer the source of truth. It is
probably reasonable given that the config is internal.
##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java:
##########
@@ -69,19 +72,45 @@ public JobAutoScalerImpl(
this.evaluator = evaluator;
this.scalingExecutor = scalingExecutor;
this.eventRecorder = eventRecorder;
+ this.infoManager = new AutoscalerInfoManager(kubernetesClient);
}
@Override
- public void cleanup(AbstractFlinkResource<?, ?> cr) {
+ public void cleanup(FlinkResourceContext<?> ctx) {
LOG.info("Cleaning up autoscaling meta data");
+ var cr = ctx.getResource();
metricsCollector.cleanup(cr);
var resourceId = ResourceID.fromResource(cr);
lastEvaluatedMetrics.remove(resourceId);
flinkMetrics.remove(resourceId);
+ infoManager.removeInfoFromCache(cr);
}
@Override
- public boolean scale(FlinkResourceContext<? extends
AbstractFlinkResource<?, ?>> ctx) {
+ public Map<String, String> getParallelismOverrides(FlinkResourceContext<?>
ctx) {
+ var conf = ctx.getObserveConfig();
+ try {
+ var infoOpt = infoManager.getInfo(ctx.getResource());
+ if (infoOpt.isPresent()) {
+ var info = infoOpt.get();
+ // If autoscaler/scaling was disabled after enabling, we need
to delete the
+ // overrides
+ if (!conf.getBoolean(AUTOSCALER_ENABLED)
+ ||
!conf.getBoolean(AutoScalerOptions.SCALING_ENABLED)) {
+ info.removeCurrentOverrides();
Review Comment:
I think we should make this conditional only on the autoscaler
enabled/disabled flag. If users temporarily disable scaling, they don't want to
reset all autoscaler works (i.e. the overrides should not get cleared).
##########
flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java:
##########
@@ -93,4 +95,10 @@ public ResourceLifecycleState getLifecycleState() {
return ResourceLifecycleState.DEPLOYED;
}
+
+ /**
+ * Internal flag to signal that due to some condition we need to schedule
a new reconciliation
+ * loop immediately. For example autoscaler overrides have changed and we
need to apply them.
+ */
+ @JsonIgnore @Internal private boolean immediateReconciliationNeeded =
false;
Review Comment:
Would it be an option to use the current Thread to re-trigger
reconciliation? This feels like a bit of a workaround.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]