gyfora commented on code in PR #1085:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/1085#discussion_r3442523379


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -70,27 +77,43 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
     public static final String RESOURCE_QUOTA_REACHED_MESSAGE =
             "Resource usage is above the allowed limit for scaling operations. 
Please adjust the resource quota manually.";
 
+    public static final String SCALING_VETOED_REASON = "ScalingVetoed";
+    public static final String SCALING_APPROVED_REASON = "ScalingApproved";
+
     private static final Logger LOG = 
LoggerFactory.getLogger(ScalingExecutor.class);
 
     private final JobVertexScaler<KEY, Context> jobVertexScaler;
     private final AutoScalerEventHandler<KEY, Context> autoScalerEventHandler;
     private final AutoScalerStateStore<KEY, Context> autoScalerStateStore;
     private final ResourceCheck resourceCheck;
+    private final Collection<ScalingExecutorPlugin<KEY, Context>> 
discoveredScalingCustomExecutors;

Review Comment:
   why not simply call it customExecutors?



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -459,6 +493,144 @@ private boolean checkIfBlockedAndTriggerScalingEvent(
         return !scaleEnabled || isExcluded;
     }
 
+    /**
+     * Apply pluggable scaling custom executors (SPI extension point).
+     *
+     * <p>The chain to apply is resolved per reconciliation against the 
per-job Configuration,
+     * mirroring custom metric evaluator plugin flow.
+     */
+    private Map<JobVertexID, ScalingSummary> applyScalingExecutorPlugins(
+            Context context,
+            Configuration conf,
+            EvaluatedMetrics evaluatedMetrics,
+            JobTopology jobTopology,
+            Map<JobVertexID, ScalingSummary> scalingSummaries) {
+        Collection<Map.Entry<String, ScalingExecutorPlugin<KEY, Context>>> 
chain =
+                resolveCustomScalingExecutors(conf);
+        if (chain.isEmpty()) {
+            return scalingSummaries;
+        }
+        for (Map.Entry<String, ScalingExecutorPlugin<KEY, Context>> entry : 
chain) {
+            var instanceName = entry.getKey();
+            var plugin = entry.getValue();
+            var pluginClassName = plugin.getClass().getName();
+            var pluginConfiguration =
+                    AutoScalerOptions.customScalingExecutorConfiguration(conf, 
instanceName);
+            var pluginContext =
+                    new ScalingExecutorPlugin.Context<>(
+                            context, pluginConfiguration, evaluatedMetrics, 
jobTopology);
+            scalingSummaries = plugin.apply(pluginContext, scalingSummaries);
+            if (scalingSummaries == null || scalingSummaries.isEmpty()) {
+                LOG.info(
+                        "Scaling vetoed by scaling custom executor instance 
'{}' (class={}).",
+                        instanceName,
+                        pluginClassName);
+                autoScalerEventHandler.handleEvent(
+                        context,
+                        AutoScalerEventHandler.Type.Warning,
+                        SCALING_VETOED_REASON,
+                        String.format(
+                                "Scaling decision vetoed by scaling custom 
executor instance '%s' (class=%s).",
+                                instanceName, pluginClassName),
+                        SCALING_VETOED_REASON + ":" + instanceName,
+                        conf.get(SCALING_EVENT_INTERVAL));
+                return null;
+            }
+            LOG.info(
+                    "Scaling decision approved by scaling custom executor 
instance '{}' (class={}) with summaries: {}",
+                    instanceName,
+                    pluginClassName,
+                    scalingSummaries.values());
+            autoScalerEventHandler.handleEvent(
+                    context,
+                    AutoScalerEventHandler.Type.Normal,
+                    SCALING_APPROVED_REASON,

Review Comment:
   This should not be an event



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutorPlugin.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import lombok.Value;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * A pluggable plugin that allows users to intercept, modify, or reject 
computed scaling decisions
+ * before they are applied. Implementations are invoked in the {@link 
ScalingExecutor} after scaling
+ * summaries have been computed and the blocked check has passed, but before 
the actual application
+ * of parallelism overrides.
+ *
+ * <p>Multiple plugin implementations can be chained. Each plugin receives the 
(possibly modified)
+ * output of the previous plugin. If any plugin returns an empty map, the 
scaling operation is
+ * vetoed entirely.
+ *
+ * <p>This was introduced as part of <a
+ * 
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-575%3A+Scaling+Executor+Plugin+SPI+for+Flink+Autoscaler";>FLIP-575:
+ * Scaling Executor Plugin SPI for Flink Autoscaler</a> and is complementary 
to <a
+ * 
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-514%3A+Custom+Evaluator+plugin+for+Flink+Autoscaler";>FLIP-514:
+ * Custom Evaluator plugin for Flink Autoscaler</a> which provides 
extensibility at the metric
+ * evaluation layer.
+ *
+ * <p>Implementations are discovered via Java's {@link 
java.util.ServiceLoader} mechanism. To
+ * register a custom plugin, add the fully qualified class name of the 
implementation to {@code
+ * META-INF/services/org.apache.flink.autoscaler.ScalingExecutorPlugin}.
+ *
+ * @param <KEY> The job key.
+ * @param <CTX> Instance of {@link JobAutoScalerContext}.
+ */
+public interface ScalingExecutorPlugin<KEY, CTX extends 
JobAutoScalerContext<KEY>> {
+
+    /**
+     * Returns the priority of this plugin in the chain. Plugins with lower 
priority values are
+     * executed first. The default priority is 0. Plugins with equal priority 
have no guaranteed
+     * relative ordering.
+     *
+     * <p>Example usage: a resource-gating plugin (priority -100) should run 
before a
+     * parallelism-alignment plugin (priority 0), which should run before a 
cost-cap plugin
+     * (priority 100).
+     *
+     * @return the priority value; lower values execute first.
+     */
+    default int priority() {
+        return 0;
+    }
+
+    /**
+     * Applies the plugin logic to the computed scaling summaries before they 
are executed.
+     *
+     * <p>Implementations can:
+     *
+     * <ul>
+     *   <li>Return the summaries unmodified to approve the scaling decision 
as-is.
+     *   <li>Return a modified map (e.g., with adjusted parallelism values) to 
alter the decision.
+     *   <li>Return an empty map (via {@link Collections#emptyMap()}) to 
veto/reject the scaling
+     *       operation entirely.
+     * </ul>
+     *
+     * @param context The plugin context wrapping the autoscaler context, 
configuration, evaluated
+     *     metrics, and job topology.
+     * @param scalingSummaries The computed scaling summaries keyed by vertex 
ID. This map contains
+     *     only vertices that require a parallelism change.
+     * @return The (possibly modified) scaling summaries to apply, or an empty 
map to veto the
+     *     scaling.
+     */
+    Map<JobVertexID, ScalingSummary> apply(
+            Context<KEY, CTX> context, Map<JobVertexID, ScalingSummary> 
scalingSummaries);
+
+    /**
+     * Immutable context wrapping all inputs available to the scaling executor 
plugin, providing
+     * access to the autoscaler context, configuration, evaluated metrics, and 
job topology.
+     *
+     * @param <KEY> The job key.
+     * @param <CTX> Instance of {@link JobAutoScalerContext}.
+     */
+    @Value
+    class Context<KEY, CTX extends JobAutoScalerContext<KEY>> {
+
+        /** The autoscaler context for the current job. */
+        CTX autoScalerContext;
+
+        /**
+         * The prefix-stripped, per-plugin configuration for this invocation, 
populated from {@code
+         * job.autoscaler.scaling.custom-executor.<name>.<parameter>} entries 
(with the {@code
+         * kubernetes.operator.} legacy fallback honored). Never {@code null}; 
empty when the plugin
+         * has no per-instance options configured.
+         */
+        Configuration configuration;

Review Comment:
   the autoscaler context already has configuration



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -459,6 +493,144 @@ private boolean checkIfBlockedAndTriggerScalingEvent(
         return !scaleEnabled || isExcluded;
     }
 
+    /**
+     * Apply pluggable scaling custom executors (SPI extension point).
+     *
+     * <p>The chain to apply is resolved per reconciliation against the 
per-job Configuration,
+     * mirroring custom metric evaluator plugin flow.
+     */
+    private Map<JobVertexID, ScalingSummary> applyScalingExecutorPlugins(

Review Comment:
   rename to applyCustomExecutors



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutorPlugin.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import lombok.Value;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * A pluggable plugin that allows users to intercept, modify, or reject 
computed scaling decisions
+ * before they are applied. Implementations are invoked in the {@link 
ScalingExecutor} after scaling
+ * summaries have been computed and the blocked check has passed, but before 
the actual application
+ * of parallelism overrides.
+ *
+ * <p>Multiple plugin implementations can be chained. Each plugin receives the 
(possibly modified)
+ * output of the previous plugin. If any plugin returns an empty map, the 
scaling operation is
+ * vetoed entirely.
+ *
+ * <p>This was introduced as part of <a
+ * 
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-575%3A+Scaling+Executor+Plugin+SPI+for+Flink+Autoscaler";>FLIP-575:
+ * Scaling Executor Plugin SPI for Flink Autoscaler</a> and is complementary 
to <a
+ * 
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-514%3A+Custom+Evaluator+plugin+for+Flink+Autoscaler";>FLIP-514:
+ * Custom Evaluator plugin for Flink Autoscaler</a> which provides 
extensibility at the metric
+ * evaluation layer.
+ *
+ * <p>Implementations are discovered via Java's {@link 
java.util.ServiceLoader} mechanism. To
+ * register a custom plugin, add the fully qualified class name of the 
implementation to {@code
+ * META-INF/services/org.apache.flink.autoscaler.ScalingExecutorPlugin}.
+ *
+ * @param <KEY> The job key.
+ * @param <CTX> Instance of {@link JobAutoScalerContext}.
+ */
+public interface ScalingExecutorPlugin<KEY, CTX extends 
JobAutoScalerContext<KEY>> {
+
+    /**
+     * Returns the priority of this plugin in the chain. Plugins with lower 
priority values are
+     * executed first. The default priority is 0. Plugins with equal priority 
have no guaranteed
+     * relative ordering.
+     *
+     * <p>Example usage: a resource-gating plugin (priority -100) should run 
before a
+     * parallelism-alignment plugin (priority 0), which should run before a 
cost-cap plugin
+     * (priority 100).
+     *
+     * @return the priority value; lower values execute first.
+     */
+    default int priority() {
+        return 0;
+    }
+
+    /**
+     * Applies the plugin logic to the computed scaling summaries before they 
are executed.
+     *
+     * <p>Implementations can:
+     *
+     * <ul>
+     *   <li>Return the summaries unmodified to approve the scaling decision 
as-is.
+     *   <li>Return a modified map (e.g., with adjusted parallelism values) to 
alter the decision.
+     *   <li>Return an empty map (via {@link Collections#emptyMap()}) to 
veto/reject the scaling
+     *       operation entirely.
+     * </ul>
+     *
+     * @param context The plugin context wrapping the autoscaler context, 
configuration, evaluated
+     *     metrics, and job topology.
+     * @param scalingSummaries The computed scaling summaries keyed by vertex 
ID. This map contains
+     *     only vertices that require a parallelism change.
+     * @return The (possibly modified) scaling summaries to apply, or an empty 
map to veto the
+     *     scaling.
+     */
+    Map<JobVertexID, ScalingSummary> apply(
+            Context<KEY, CTX> context, Map<JobVertexID, ScalingSummary> 
scalingSummaries);
+
+    /**
+     * Immutable context wrapping all inputs available to the scaling executor 
plugin, providing
+     * access to the autoscaler context, configuration, evaluated metrics, and 
job topology.
+     *
+     * @param <KEY> The job key.
+     * @param <CTX> Instance of {@link JobAutoScalerContext}.
+     */
+    @Value
+    class Context<KEY, CTX extends JobAutoScalerContext<KEY>> {
+
+        /** The autoscaler context for the current job. */
+        CTX autoScalerContext;

Review Comment:
   maybe this should extend the autoscaler context instead -> maybe do the same 
in the evaluator plugin too?



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -459,6 +493,144 @@ private boolean checkIfBlockedAndTriggerScalingEvent(
         return !scaleEnabled || isExcluded;
     }
 
+    /**
+     * Apply pluggable scaling custom executors (SPI extension point).
+     *
+     * <p>The chain to apply is resolved per reconciliation against the 
per-job Configuration,
+     * mirroring custom metric evaluator plugin flow.
+     */
+    private Map<JobVertexID, ScalingSummary> applyScalingExecutorPlugins(
+            Context context,
+            Configuration conf,
+            EvaluatedMetrics evaluatedMetrics,
+            JobTopology jobTopology,
+            Map<JobVertexID, ScalingSummary> scalingSummaries) {
+        Collection<Map.Entry<String, ScalingExecutorPlugin<KEY, Context>>> 
chain =
+                resolveCustomScalingExecutors(conf);
+        if (chain.isEmpty()) {
+            return scalingSummaries;
+        }
+        for (Map.Entry<String, ScalingExecutorPlugin<KEY, Context>> entry : 
chain) {
+            var instanceName = entry.getKey();
+            var plugin = entry.getValue();
+            var pluginClassName = plugin.getClass().getName();
+            var pluginConfiguration =
+                    AutoScalerOptions.customScalingExecutorConfiguration(conf, 
instanceName);
+            var pluginContext =
+                    new ScalingExecutorPlugin.Context<>(
+                            context, pluginConfiguration, evaluatedMetrics, 
jobTopology);
+            scalingSummaries = plugin.apply(pluginContext, scalingSummaries);
+            if (scalingSummaries == null || scalingSummaries.isEmpty()) {
+                LOG.info(
+                        "Scaling vetoed by scaling custom executor instance 
'{}' (class={}).",
+                        instanceName,
+                        pluginClassName);
+                autoScalerEventHandler.handleEvent(
+                        context,
+                        AutoScalerEventHandler.Type.Warning,

Review Comment:
   Why is this a warning?



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -459,6 +493,144 @@ private boolean checkIfBlockedAndTriggerScalingEvent(
         return !scaleEnabled || isExcluded;
     }
 
+    /**
+     * Apply pluggable scaling custom executors (SPI extension point).
+     *
+     * <p>The chain to apply is resolved per reconciliation against the 
per-job Configuration,
+     * mirroring custom metric evaluator plugin flow.
+     */
+    private Map<JobVertexID, ScalingSummary> applyScalingExecutorPlugins(
+            Context context,
+            Configuration conf,
+            EvaluatedMetrics evaluatedMetrics,
+            JobTopology jobTopology,
+            Map<JobVertexID, ScalingSummary> scalingSummaries) {
+        Collection<Map.Entry<String, ScalingExecutorPlugin<KEY, Context>>> 
chain =
+                resolveCustomScalingExecutors(conf);
+        if (chain.isEmpty()) {
+            return scalingSummaries;
+        }
+        for (Map.Entry<String, ScalingExecutorPlugin<KEY, Context>> entry : 
chain) {
+            var instanceName = entry.getKey();
+            var plugin = entry.getValue();
+            var pluginClassName = plugin.getClass().getName();
+            var pluginConfiguration =
+                    AutoScalerOptions.customScalingExecutorConfiguration(conf, 
instanceName);
+            var pluginContext =
+                    new ScalingExecutorPlugin.Context<>(
+                            context, pluginConfiguration, evaluatedMetrics, 
jobTopology);
+            scalingSummaries = plugin.apply(pluginContext, scalingSummaries);
+            if (scalingSummaries == null || scalingSummaries.isEmpty()) {
+                LOG.info(
+                        "Scaling vetoed by scaling custom executor instance 
'{}' (class={}).",
+                        instanceName,
+                        pluginClassName);
+                autoScalerEventHandler.handleEvent(
+                        context,
+                        AutoScalerEventHandler.Type.Warning,
+                        SCALING_VETOED_REASON,
+                        String.format(
+                                "Scaling decision vetoed by scaling custom 
executor instance '%s' (class=%s).",
+                                instanceName, pluginClassName),
+                        SCALING_VETOED_REASON + ":" + instanceName,
+                        conf.get(SCALING_EVENT_INTERVAL));
+                return null;
+            }
+            LOG.info(
+                    "Scaling decision approved by scaling custom executor 
instance '{}' (class={}) with summaries: {}",

Review Comment:
   I don't think that we should log this, this is pretty normal



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to