Dennis-Mircea commented on code in PR #1085:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/1085#discussion_r3443209776
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -70,27 +77,43 @@ public class ScalingExecutor<KEY, Context extends
JobAutoScalerContext<KEY>> {
public static final String RESOURCE_QUOTA_REACHED_MESSAGE =
"Resource usage is above the allowed limit for scaling operations.
Please adjust the resource quota manually.";
+ public static final String SCALING_VETOED_REASON = "ScalingVetoed";
+ public static final String SCALING_APPROVED_REASON = "ScalingApproved";
+
private static final Logger LOG =
LoggerFactory.getLogger(ScalingExecutor.class);
private final JobVertexScaler<KEY, Context> jobVertexScaler;
private final AutoScalerEventHandler<KEY, Context> autoScalerEventHandler;
private final AutoScalerStateStore<KEY, Context> autoScalerStateStore;
private final ResourceCheck resourceCheck;
+ private final Collection<ScalingExecutorPlugin<KEY, Context>>
discoveredScalingCustomExecutors;
Review Comment:
Yes, that's right, I missed this one in my previous refinements. I fixed it
now!
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -459,6 +493,144 @@ private boolean checkIfBlockedAndTriggerScalingEvent(
return !scaleEnabled || isExcluded;
}
+ /**
+ * Apply pluggable scaling custom executors (SPI extension point).
+ *
+ * <p>The chain to apply is resolved per reconciliation against the
per-job Configuration,
+ * mirroring custom metric evaluator plugin flow.
+ */
+ private Map<JobVertexID, ScalingSummary> applyScalingExecutorPlugins(
Review Comment:
Addressed!
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -459,6 +493,144 @@ private boolean checkIfBlockedAndTriggerScalingEvent(
return !scaleEnabled || isExcluded;
}
+ /**
+ * Apply pluggable scaling custom executors (SPI extension point).
+ *
+ * <p>The chain to apply is resolved per reconciliation against the
per-job Configuration,
+ * mirroring custom metric evaluator plugin flow.
+ */
+ private Map<JobVertexID, ScalingSummary> applyScalingExecutorPlugins(
+ Context context,
+ Configuration conf,
+ EvaluatedMetrics evaluatedMetrics,
+ JobTopology jobTopology,
+ Map<JobVertexID, ScalingSummary> scalingSummaries) {
+ Collection<Map.Entry<String, ScalingExecutorPlugin<KEY, Context>>>
chain =
+ resolveCustomScalingExecutors(conf);
+ if (chain.isEmpty()) {
+ return scalingSummaries;
+ }
+ for (Map.Entry<String, ScalingExecutorPlugin<KEY, Context>> entry :
chain) {
+ var instanceName = entry.getKey();
+ var plugin = entry.getValue();
+ var pluginClassName = plugin.getClass().getName();
+ var pluginConfiguration =
+ AutoScalerOptions.customScalingExecutorConfiguration(conf,
instanceName);
+ var pluginContext =
+ new ScalingExecutorPlugin.Context<>(
+ context, pluginConfiguration, evaluatedMetrics,
jobTopology);
+ scalingSummaries = plugin.apply(pluginContext, scalingSummaries);
+ if (scalingSummaries == null || scalingSummaries.isEmpty()) {
+ LOG.info(
+ "Scaling vetoed by scaling custom executor instance
'{}' (class={}).",
+ instanceName,
+ pluginClassName);
+ autoScalerEventHandler.handleEvent(
+ context,
+ AutoScalerEventHandler.Type.Warning,
+ SCALING_VETOED_REASON,
+ String.format(
+ "Scaling decision vetoed by scaling custom
executor instance '%s' (class=%s).",
+ instanceName, pluginClassName),
+ SCALING_VETOED_REASON + ":" + instanceName,
+ conf.get(SCALING_EVENT_INTERVAL));
+ return null;
+ }
+ LOG.info(
+ "Scaling decision approved by scaling custom executor
instance '{}' (class={}) with summaries: {}",
+ instanceName,
+ pluginClassName,
+ scalingSummaries.values());
+ autoScalerEventHandler.handleEvent(
+ context,
+ AutoScalerEventHandler.Type.Normal,
+ SCALING_APPROVED_REASON,
Review Comment:
I removed it!
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -459,6 +493,144 @@ private boolean checkIfBlockedAndTriggerScalingEvent(
return !scaleEnabled || isExcluded;
}
+ /**
+ * Apply pluggable scaling custom executors (SPI extension point).
+ *
+ * <p>The chain to apply is resolved per reconciliation against the
per-job Configuration,
+ * mirroring custom metric evaluator plugin flow.
+ */
+ private Map<JobVertexID, ScalingSummary> applyScalingExecutorPlugins(
+ Context context,
+ Configuration conf,
+ EvaluatedMetrics evaluatedMetrics,
+ JobTopology jobTopology,
+ Map<JobVertexID, ScalingSummary> scalingSummaries) {
+ Collection<Map.Entry<String, ScalingExecutorPlugin<KEY, Context>>>
chain =
+ resolveCustomScalingExecutors(conf);
+ if (chain.isEmpty()) {
+ return scalingSummaries;
+ }
+ for (Map.Entry<String, ScalingExecutorPlugin<KEY, Context>> entry :
chain) {
+ var instanceName = entry.getKey();
+ var plugin = entry.getValue();
+ var pluginClassName = plugin.getClass().getName();
+ var pluginConfiguration =
+ AutoScalerOptions.customScalingExecutorConfiguration(conf,
instanceName);
+ var pluginContext =
+ new ScalingExecutorPlugin.Context<>(
+ context, pluginConfiguration, evaluatedMetrics,
jobTopology);
+ scalingSummaries = plugin.apply(pluginContext, scalingSummaries);
+ if (scalingSummaries == null || scalingSummaries.isEmpty()) {
+ LOG.info(
+ "Scaling vetoed by scaling custom executor instance
'{}' (class={}).",
+ instanceName,
+ pluginClassName);
+ autoScalerEventHandler.handleEvent(
+ context,
+ AutoScalerEventHandler.Type.Warning,
Review Comment:
Because it cancels entirely the scaling decision of the autoscaler, and I
see it more like a warning than an info event.
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -459,6 +493,144 @@ private boolean checkIfBlockedAndTriggerScalingEvent(
return !scaleEnabled || isExcluded;
}
+ /**
+ * Apply pluggable scaling custom executors (SPI extension point).
+ *
+ * <p>The chain to apply is resolved per reconciliation against the
per-job Configuration,
+ * mirroring custom metric evaluator plugin flow.
+ */
+ private Map<JobVertexID, ScalingSummary> applyScalingExecutorPlugins(
+ Context context,
+ Configuration conf,
+ EvaluatedMetrics evaluatedMetrics,
+ JobTopology jobTopology,
+ Map<JobVertexID, ScalingSummary> scalingSummaries) {
+ Collection<Map.Entry<String, ScalingExecutorPlugin<KEY, Context>>>
chain =
+ resolveCustomScalingExecutors(conf);
+ if (chain.isEmpty()) {
+ return scalingSummaries;
+ }
+ for (Map.Entry<String, ScalingExecutorPlugin<KEY, Context>> entry :
chain) {
+ var instanceName = entry.getKey();
+ var plugin = entry.getValue();
+ var pluginClassName = plugin.getClass().getName();
+ var pluginConfiguration =
+ AutoScalerOptions.customScalingExecutorConfiguration(conf,
instanceName);
+ var pluginContext =
+ new ScalingExecutorPlugin.Context<>(
+ context, pluginConfiguration, evaluatedMetrics,
jobTopology);
+ scalingSummaries = plugin.apply(pluginContext, scalingSummaries);
+ if (scalingSummaries == null || scalingSummaries.isEmpty()) {
+ LOG.info(
+ "Scaling vetoed by scaling custom executor instance
'{}' (class={}).",
+ instanceName,
+ pluginClassName);
+ autoScalerEventHandler.handleEvent(
+ context,
+ AutoScalerEventHandler.Type.Warning,
+ SCALING_VETOED_REASON,
+ String.format(
+ "Scaling decision vetoed by scaling custom
executor instance '%s' (class=%s).",
+ instanceName, pluginClassName),
+ SCALING_VETOED_REASON + ":" + instanceName,
+ conf.get(SCALING_EVENT_INTERVAL));
+ return null;
+ }
+ LOG.info(
+ "Scaling decision approved by scaling custom executor
instance '{}' (class={}) with summaries: {}",
Review Comment:
Right, I removed the log and the event and let a single INFO log message
instead only when the scalingSummaries differs. I also updated the placement of
the general scaling event to be placed after the memory tuning and custom
executors are applied.
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutorPlugin.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import lombok.Value;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * A pluggable plugin that allows users to intercept, modify, or reject
computed scaling decisions
+ * before they are applied. Implementations are invoked in the {@link
ScalingExecutor} after scaling
+ * summaries have been computed and the blocked check has passed, but before
the actual application
+ * of parallelism overrides.
+ *
+ * <p>Multiple plugin implementations can be chained. Each plugin receives the
(possibly modified)
+ * output of the previous plugin. If any plugin returns an empty map, the
scaling operation is
+ * vetoed entirely.
+ *
+ * <p>This was introduced as part of <a
+ *
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-575%3A+Scaling+Executor+Plugin+SPI+for+Flink+Autoscaler">FLIP-575:
+ * Scaling Executor Plugin SPI for Flink Autoscaler</a> and is complementary
to <a
+ *
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-514%3A+Custom+Evaluator+plugin+for+Flink+Autoscaler">FLIP-514:
+ * Custom Evaluator plugin for Flink Autoscaler</a> which provides
extensibility at the metric
+ * evaluation layer.
+ *
+ * <p>Implementations are discovered via Java's {@link
java.util.ServiceLoader} mechanism. To
+ * register a custom plugin, add the fully qualified class name of the
implementation to {@code
+ * META-INF/services/org.apache.flink.autoscaler.ScalingExecutorPlugin}.
+ *
+ * @param <KEY> The job key.
+ * @param <CTX> Instance of {@link JobAutoScalerContext}.
+ */
+public interface ScalingExecutorPlugin<KEY, CTX extends
JobAutoScalerContext<KEY>> {
+
+ /**
+ * Returns the priority of this plugin in the chain. Plugins with lower
priority values are
+ * executed first. The default priority is 0. Plugins with equal priority
have no guaranteed
+ * relative ordering.
+ *
+ * <p>Example usage: a resource-gating plugin (priority -100) should run
before a
+ * parallelism-alignment plugin (priority 0), which should run before a
cost-cap plugin
+ * (priority 100).
+ *
+ * @return the priority value; lower values execute first.
+ */
+ default int priority() {
+ return 0;
+ }
+
+ /**
+ * Applies the plugin logic to the computed scaling summaries before they
are executed.
+ *
+ * <p>Implementations can:
+ *
+ * <ul>
+ * <li>Return the summaries unmodified to approve the scaling decision
as-is.
+ * <li>Return a modified map (e.g., with adjusted parallelism values) to
alter the decision.
+ * <li>Return an empty map (via {@link Collections#emptyMap()}) to
veto/reject the scaling
+ * operation entirely.
+ * </ul>
+ *
+ * @param context The plugin context wrapping the autoscaler context,
configuration, evaluated
+ * metrics, and job topology.
+ * @param scalingSummaries The computed scaling summaries keyed by vertex
ID. This map contains
+ * only vertices that require a parallelism change.
+ * @return The (possibly modified) scaling summaries to apply, or an empty
map to veto the
+ * scaling.
+ */
+ Map<JobVertexID, ScalingSummary> apply(
+ Context<KEY, CTX> context, Map<JobVertexID, ScalingSummary>
scalingSummaries);
+
+ /**
+ * Immutable context wrapping all inputs available to the scaling executor
plugin, providing
+ * access to the autoscaler context, configuration, evaluated metrics, and
job topology.
+ *
+ * @param <KEY> The job key.
+ * @param <CTX> Instance of {@link JobAutoScalerContext}.
+ */
+ @Value
+ class Context<KEY, CTX extends JobAutoScalerContext<KEY>> {
+
+ /** The autoscaler context for the current job. */
+ CTX autoScalerContext;
Review Comment:
I'd keep the wrapped `autoScalerContext` field rather than extend it. By not
extending it you have the power of keeping generic and make it applicable for
both operator autoscaler and standalone autoscaler. By extending it will
introduce too much of unnecessary complexity.
Also, by not extending the CTX is better from the point of view of
maintenance for every new change that the CTX may introduce.
On doing the same in the evaluator plugin, I'd leave it. The evaluator runs
at a different phase (metric evaluation) and the `ScalingMetricEvaluator`
doesn't even receive the actual autoscaler context from `JobAutoScalerImpl`.
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutorPlugin.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import lombok.Value;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * A pluggable plugin that allows users to intercept, modify, or reject
computed scaling decisions
+ * before they are applied. Implementations are invoked in the {@link
ScalingExecutor} after scaling
+ * summaries have been computed and the blocked check has passed, but before
the actual application
+ * of parallelism overrides.
+ *
+ * <p>Multiple plugin implementations can be chained. Each plugin receives the
(possibly modified)
+ * output of the previous plugin. If any plugin returns an empty map, the
scaling operation is
+ * vetoed entirely.
+ *
+ * <p>This was introduced as part of <a
+ *
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-575%3A+Scaling+Executor+Plugin+SPI+for+Flink+Autoscaler">FLIP-575:
+ * Scaling Executor Plugin SPI for Flink Autoscaler</a> and is complementary
to <a
+ *
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-514%3A+Custom+Evaluator+plugin+for+Flink+Autoscaler">FLIP-514:
+ * Custom Evaluator plugin for Flink Autoscaler</a> which provides
extensibility at the metric
+ * evaluation layer.
+ *
+ * <p>Implementations are discovered via Java's {@link
java.util.ServiceLoader} mechanism. To
+ * register a custom plugin, add the fully qualified class name of the
implementation to {@code
+ * META-INF/services/org.apache.flink.autoscaler.ScalingExecutorPlugin}.
+ *
+ * @param <KEY> The job key.
+ * @param <CTX> Instance of {@link JobAutoScalerContext}.
+ */
+public interface ScalingExecutorPlugin<KEY, CTX extends
JobAutoScalerContext<KEY>> {
+
+ /**
+ * Returns the priority of this plugin in the chain. Plugins with lower
priority values are
+ * executed first. The default priority is 0. Plugins with equal priority
have no guaranteed
+ * relative ordering.
+ *
+ * <p>Example usage: a resource-gating plugin (priority -100) should run
before a
+ * parallelism-alignment plugin (priority 0), which should run before a
cost-cap plugin
+ * (priority 100).
+ *
+ * @return the priority value; lower values execute first.
+ */
+ default int priority() {
+ return 0;
+ }
+
+ /**
+ * Applies the plugin logic to the computed scaling summaries before they
are executed.
+ *
+ * <p>Implementations can:
+ *
+ * <ul>
+ * <li>Return the summaries unmodified to approve the scaling decision
as-is.
+ * <li>Return a modified map (e.g., with adjusted parallelism values) to
alter the decision.
+ * <li>Return an empty map (via {@link Collections#emptyMap()}) to
veto/reject the scaling
+ * operation entirely.
+ * </ul>
+ *
+ * @param context The plugin context wrapping the autoscaler context,
configuration, evaluated
+ * metrics, and job topology.
+ * @param scalingSummaries The computed scaling summaries keyed by vertex
ID. This map contains
+ * only vertices that require a parallelism change.
+ * @return The (possibly modified) scaling summaries to apply, or an empty
map to veto the
+ * scaling.
+ */
+ Map<JobVertexID, ScalingSummary> apply(
+ Context<KEY, CTX> context, Map<JobVertexID, ScalingSummary>
scalingSummaries);
+
+ /**
+ * Immutable context wrapping all inputs available to the scaling executor
plugin, providing
+ * access to the autoscaler context, configuration, evaluated metrics, and
job topology.
+ *
+ * @param <KEY> The job key.
+ * @param <CTX> Instance of {@link JobAutoScalerContext}.
+ */
+ @Value
+ class Context<KEY, CTX extends JobAutoScalerContext<KEY>> {
+
+ /** The autoscaler context for the current job. */
+ CTX autoScalerContext;
+
+ /**
+ * The prefix-stripped, per-plugin configuration for this invocation,
populated from {@code
+ * job.autoscaler.scaling.custom-executor.<name>.<parameter>} entries
(with the {@code
+ * kubernetes.operator.} legacy fallback honored). Never {@code null};
empty when the plugin
+ * has no per-instance options configured.
+ */
+ Configuration configuration;
Review Comment:
This field isn't the job configuration that autoscaler context contains,
it's the per-instance, prefix-stripped plugin options, built from
`job.autoscaler.scaling.custom-executor.<name>.<parameter>`. It's intentionally
distinct from `autoScalerContext.getConfiguration()`, which is the shared
job-level config.
It has to be instance-scoped because the executor supports multiple
instances of the same plugin class registered under different `<name>`s, each
receiving its own options.
I considered merging it into the job config (as the evaluator plugin does),
but the evaluator is single-instance, so a merged view is unambiguous there.
For a multi-instance chain I'd rather keep the Configuration explicit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]