gyfora commented on code in PR #1085:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/1085#discussion_r3442523379
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -70,27 +77,43 @@ public class ScalingExecutor<KEY, Context extends
JobAutoScalerContext<KEY>> {
public static final String RESOURCE_QUOTA_REACHED_MESSAGE =
"Resource usage is above the allowed limit for scaling operations.
Please adjust the resource quota manually.";
+ public static final String SCALING_VETOED_REASON = "ScalingVetoed";
+ public static final String SCALING_APPROVED_REASON = "ScalingApproved";
+
private static final Logger LOG =
LoggerFactory.getLogger(ScalingExecutor.class);
private final JobVertexScaler<KEY, Context> jobVertexScaler;
private final AutoScalerEventHandler<KEY, Context> autoScalerEventHandler;
private final AutoScalerStateStore<KEY, Context> autoScalerStateStore;
private final ResourceCheck resourceCheck;
+ private final Collection<ScalingExecutorPlugin<KEY, Context>>
discoveredScalingCustomExecutors;
Review Comment:
why not simply call it customExecutors?
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -459,6 +493,144 @@ private boolean checkIfBlockedAndTriggerScalingEvent(
return !scaleEnabled || isExcluded;
}
+ /**
+ * Apply pluggable scaling custom executors (SPI extension point).
+ *
+ * <p>The chain to apply is resolved per reconciliation against the
per-job Configuration,
+ * mirroring custom metric evaluator plugin flow.
+ */
+ private Map<JobVertexID, ScalingSummary> applyScalingExecutorPlugins(
+ Context context,
+ Configuration conf,
+ EvaluatedMetrics evaluatedMetrics,
+ JobTopology jobTopology,
+ Map<JobVertexID, ScalingSummary> scalingSummaries) {
+ Collection<Map.Entry<String, ScalingExecutorPlugin<KEY, Context>>>
chain =
+ resolveCustomScalingExecutors(conf);
+ if (chain.isEmpty()) {
+ return scalingSummaries;
+ }
+ for (Map.Entry<String, ScalingExecutorPlugin<KEY, Context>> entry :
chain) {
+ var instanceName = entry.getKey();
+ var plugin = entry.getValue();
+ var pluginClassName = plugin.getClass().getName();
+ var pluginConfiguration =
+ AutoScalerOptions.customScalingExecutorConfiguration(conf,
instanceName);
+ var pluginContext =
+ new ScalingExecutorPlugin.Context<>(
+ context, pluginConfiguration, evaluatedMetrics,
jobTopology);
+ scalingSummaries = plugin.apply(pluginContext, scalingSummaries);
+ if (scalingSummaries == null || scalingSummaries.isEmpty()) {
+ LOG.info(
+ "Scaling vetoed by scaling custom executor instance
'{}' (class={}).",
+ instanceName,
+ pluginClassName);
+ autoScalerEventHandler.handleEvent(
+ context,
+ AutoScalerEventHandler.Type.Warning,
+ SCALING_VETOED_REASON,
+ String.format(
+ "Scaling decision vetoed by scaling custom
executor instance '%s' (class=%s).",
+ instanceName, pluginClassName),
+ SCALING_VETOED_REASON + ":" + instanceName,
+ conf.get(SCALING_EVENT_INTERVAL));
+ return null;
+ }
+ LOG.info(
+ "Scaling decision approved by scaling custom executor
instance '{}' (class={}) with summaries: {}",
+ instanceName,
+ pluginClassName,
+ scalingSummaries.values());
+ autoScalerEventHandler.handleEvent(
+ context,
+ AutoScalerEventHandler.Type.Normal,
+ SCALING_APPROVED_REASON,
Review Comment:
This should not be an event
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutorPlugin.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import lombok.Value;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * A pluggable plugin that allows users to intercept, modify, or reject
computed scaling decisions
+ * before they are applied. Implementations are invoked in the {@link
ScalingExecutor} after scaling
+ * summaries have been computed and the blocked check has passed, but before
the actual application
+ * of parallelism overrides.
+ *
+ * <p>Multiple plugin implementations can be chained. Each plugin receives the
(possibly modified)
+ * output of the previous plugin. If any plugin returns an empty map, the
scaling operation is
+ * vetoed entirely.
+ *
+ * <p>This was introduced as part of <a
+ *
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-575%3A+Scaling+Executor+Plugin+SPI+for+Flink+Autoscaler">FLIP-575:
+ * Scaling Executor Plugin SPI for Flink Autoscaler</a> and is complementary
to <a
+ *
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-514%3A+Custom+Evaluator+plugin+for+Flink+Autoscaler">FLIP-514:
+ * Custom Evaluator plugin for Flink Autoscaler</a> which provides
extensibility at the metric
+ * evaluation layer.
+ *
+ * <p>Implementations are discovered via Java's {@link
java.util.ServiceLoader} mechanism. To
+ * register a custom plugin, add the fully qualified class name of the
implementation to {@code
+ * META-INF/services/org.apache.flink.autoscaler.ScalingExecutorPlugin}.
+ *
+ * @param <KEY> The job key.
+ * @param <CTX> Instance of {@link JobAutoScalerContext}.
+ */
+public interface ScalingExecutorPlugin<KEY, CTX extends
JobAutoScalerContext<KEY>> {
+
+ /**
+ * Returns the priority of this plugin in the chain. Plugins with lower
priority values are
+ * executed first. The default priority is 0. Plugins with equal priority
have no guaranteed
+ * relative ordering.
+ *
+ * <p>Example usage: a resource-gating plugin (priority -100) should run
before a
+ * parallelism-alignment plugin (priority 0), which should run before a
cost-cap plugin
+ * (priority 100).
+ *
+ * @return the priority value; lower values execute first.
+ */
+ default int priority() {
+ return 0;
+ }
+
+ /**
+ * Applies the plugin logic to the computed scaling summaries before they
are executed.
+ *
+ * <p>Implementations can:
+ *
+ * <ul>
+ * <li>Return the summaries unmodified to approve the scaling decision
as-is.
+ * <li>Return a modified map (e.g., with adjusted parallelism values) to
alter the decision.
+ * <li>Return an empty map (via {@link Collections#emptyMap()}) to
veto/reject the scaling
+ * operation entirely.
+ * </ul>
+ *
+ * @param context The plugin context wrapping the autoscaler context,
configuration, evaluated
+ * metrics, and job topology.
+ * @param scalingSummaries The computed scaling summaries keyed by vertex
ID. This map contains
+ * only vertices that require a parallelism change.
+ * @return The (possibly modified) scaling summaries to apply, or an empty
map to veto the
+ * scaling.
+ */
+ Map<JobVertexID, ScalingSummary> apply(
+ Context<KEY, CTX> context, Map<JobVertexID, ScalingSummary>
scalingSummaries);
+
+ /**
+ * Immutable context wrapping all inputs available to the scaling executor
plugin, providing
+ * access to the autoscaler context, configuration, evaluated metrics, and
job topology.
+ *
+ * @param <KEY> The job key.
+ * @param <CTX> Instance of {@link JobAutoScalerContext}.
+ */
+ @Value
+ class Context<KEY, CTX extends JobAutoScalerContext<KEY>> {
+
+ /** The autoscaler context for the current job. */
+ CTX autoScalerContext;
+
+ /**
+ * The prefix-stripped, per-plugin configuration for this invocation,
populated from {@code
+ * job.autoscaler.scaling.custom-executor.<name>.<parameter>} entries
(with the {@code
+ * kubernetes.operator.} legacy fallback honored). Never {@code null};
empty when the plugin
+ * has no per-instance options configured.
+ */
+ Configuration configuration;
Review Comment:
the autoscaler context already has configuration
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -459,6 +493,144 @@ private boolean checkIfBlockedAndTriggerScalingEvent(
return !scaleEnabled || isExcluded;
}
+ /**
+ * Apply pluggable scaling custom executors (SPI extension point).
+ *
+ * <p>The chain to apply is resolved per reconciliation against the
per-job Configuration,
+ * mirroring custom metric evaluator plugin flow.
+ */
+ private Map<JobVertexID, ScalingSummary> applyScalingExecutorPlugins(
Review Comment:
rename to applyCustomExecutors
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutorPlugin.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import lombok.Value;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * A pluggable plugin that allows users to intercept, modify, or reject
computed scaling decisions
+ * before they are applied. Implementations are invoked in the {@link
ScalingExecutor} after scaling
+ * summaries have been computed and the blocked check has passed, but before
the actual application
+ * of parallelism overrides.
+ *
+ * <p>Multiple plugin implementations can be chained. Each plugin receives the
(possibly modified)
+ * output of the previous plugin. If any plugin returns an empty map, the
scaling operation is
+ * vetoed entirely.
+ *
+ * <p>This was introduced as part of <a
+ *
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-575%3A+Scaling+Executor+Plugin+SPI+for+Flink+Autoscaler">FLIP-575:
+ * Scaling Executor Plugin SPI for Flink Autoscaler</a> and is complementary
to <a
+ *
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-514%3A+Custom+Evaluator+plugin+for+Flink+Autoscaler">FLIP-514:
+ * Custom Evaluator plugin for Flink Autoscaler</a> which provides
extensibility at the metric
+ * evaluation layer.
+ *
+ * <p>Implementations are discovered via Java's {@link
java.util.ServiceLoader} mechanism. To
+ * register a custom plugin, add the fully qualified class name of the
implementation to {@code
+ * META-INF/services/org.apache.flink.autoscaler.ScalingExecutorPlugin}.
+ *
+ * @param <KEY> The job key.
+ * @param <CTX> Instance of {@link JobAutoScalerContext}.
+ */
+public interface ScalingExecutorPlugin<KEY, CTX extends
JobAutoScalerContext<KEY>> {
+
+ /**
+ * Returns the priority of this plugin in the chain. Plugins with lower
priority values are
+ * executed first. The default priority is 0. Plugins with equal priority
have no guaranteed
+ * relative ordering.
+ *
+ * <p>Example usage: a resource-gating plugin (priority -100) should run
before a
+ * parallelism-alignment plugin (priority 0), which should run before a
cost-cap plugin
+ * (priority 100).
+ *
+ * @return the priority value; lower values execute first.
+ */
+ default int priority() {
+ return 0;
+ }
+
+ /**
+ * Applies the plugin logic to the computed scaling summaries before they
are executed.
+ *
+ * <p>Implementations can:
+ *
+ * <ul>
+ * <li>Return the summaries unmodified to approve the scaling decision
as-is.
+ * <li>Return a modified map (e.g., with adjusted parallelism values) to
alter the decision.
+ * <li>Return an empty map (via {@link Collections#emptyMap()}) to
veto/reject the scaling
+ * operation entirely.
+ * </ul>
+ *
+ * @param context The plugin context wrapping the autoscaler context,
configuration, evaluated
+ * metrics, and job topology.
+ * @param scalingSummaries The computed scaling summaries keyed by vertex
ID. This map contains
+ * only vertices that require a parallelism change.
+ * @return The (possibly modified) scaling summaries to apply, or an empty
map to veto the
+ * scaling.
+ */
+ Map<JobVertexID, ScalingSummary> apply(
+ Context<KEY, CTX> context, Map<JobVertexID, ScalingSummary>
scalingSummaries);
+
+ /**
+ * Immutable context wrapping all inputs available to the scaling executor
plugin, providing
+ * access to the autoscaler context, configuration, evaluated metrics, and
job topology.
+ *
+ * @param <KEY> The job key.
+ * @param <CTX> Instance of {@link JobAutoScalerContext}.
+ */
+ @Value
+ class Context<KEY, CTX extends JobAutoScalerContext<KEY>> {
+
+ /** The autoscaler context for the current job. */
+ CTX autoScalerContext;
Review Comment:
maybe this should extend the autoscaler context instead -> maybe do the same
in the evaluator plugin too?
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -459,6 +493,144 @@ private boolean checkIfBlockedAndTriggerScalingEvent(
return !scaleEnabled || isExcluded;
}
+ /**
+ * Apply pluggable scaling custom executors (SPI extension point).
+ *
+ * <p>The chain to apply is resolved per reconciliation against the
per-job Configuration,
+ * mirroring custom metric evaluator plugin flow.
+ */
+ private Map<JobVertexID, ScalingSummary> applyScalingExecutorPlugins(
+ Context context,
+ Configuration conf,
+ EvaluatedMetrics evaluatedMetrics,
+ JobTopology jobTopology,
+ Map<JobVertexID, ScalingSummary> scalingSummaries) {
+ Collection<Map.Entry<String, ScalingExecutorPlugin<KEY, Context>>>
chain =
+ resolveCustomScalingExecutors(conf);
+ if (chain.isEmpty()) {
+ return scalingSummaries;
+ }
+ for (Map.Entry<String, ScalingExecutorPlugin<KEY, Context>> entry :
chain) {
+ var instanceName = entry.getKey();
+ var plugin = entry.getValue();
+ var pluginClassName = plugin.getClass().getName();
+ var pluginConfiguration =
+ AutoScalerOptions.customScalingExecutorConfiguration(conf,
instanceName);
+ var pluginContext =
+ new ScalingExecutorPlugin.Context<>(
+ context, pluginConfiguration, evaluatedMetrics,
jobTopology);
+ scalingSummaries = plugin.apply(pluginContext, scalingSummaries);
+ if (scalingSummaries == null || scalingSummaries.isEmpty()) {
+ LOG.info(
+ "Scaling vetoed by scaling custom executor instance
'{}' (class={}).",
+ instanceName,
+ pluginClassName);
+ autoScalerEventHandler.handleEvent(
+ context,
+ AutoScalerEventHandler.Type.Warning,
Review Comment:
Why is this a warning?
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -459,6 +493,144 @@ private boolean checkIfBlockedAndTriggerScalingEvent(
return !scaleEnabled || isExcluded;
}
+ /**
+ * Apply pluggable scaling custom executors (SPI extension point).
+ *
+ * <p>The chain to apply is resolved per reconciliation against the
per-job Configuration,
+ * mirroring custom metric evaluator plugin flow.
+ */
+ private Map<JobVertexID, ScalingSummary> applyScalingExecutorPlugins(
+ Context context,
+ Configuration conf,
+ EvaluatedMetrics evaluatedMetrics,
+ JobTopology jobTopology,
+ Map<JobVertexID, ScalingSummary> scalingSummaries) {
+ Collection<Map.Entry<String, ScalingExecutorPlugin<KEY, Context>>>
chain =
+ resolveCustomScalingExecutors(conf);
+ if (chain.isEmpty()) {
+ return scalingSummaries;
+ }
+ for (Map.Entry<String, ScalingExecutorPlugin<KEY, Context>> entry :
chain) {
+ var instanceName = entry.getKey();
+ var plugin = entry.getValue();
+ var pluginClassName = plugin.getClass().getName();
+ var pluginConfiguration =
+ AutoScalerOptions.customScalingExecutorConfiguration(conf,
instanceName);
+ var pluginContext =
+ new ScalingExecutorPlugin.Context<>(
+ context, pluginConfiguration, evaluatedMetrics,
jobTopology);
+ scalingSummaries = plugin.apply(pluginContext, scalingSummaries);
+ if (scalingSummaries == null || scalingSummaries.isEmpty()) {
+ LOG.info(
+ "Scaling vetoed by scaling custom executor instance
'{}' (class={}).",
+ instanceName,
+ pluginClassName);
+ autoScalerEventHandler.handleEvent(
+ context,
+ AutoScalerEventHandler.Type.Warning,
+ SCALING_VETOED_REASON,
+ String.format(
+ "Scaling decision vetoed by scaling custom
executor instance '%s' (class=%s).",
+ instanceName, pluginClassName),
+ SCALING_VETOED_REASON + ":" + instanceName,
+ conf.get(SCALING_EVENT_INTERVAL));
+ return null;
+ }
+ LOG.info(
+ "Scaling decision approved by scaling custom executor
instance '{}' (class={}) with summaries: {}",
Review Comment:
I don't think that we should log this, this is pretty normal
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]