Dennis-Mircea commented on code in PR #1085:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/1085#discussion_r3443209776


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -70,27 +77,43 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
     public static final String RESOURCE_QUOTA_REACHED_MESSAGE =
             "Resource usage is above the allowed limit for scaling operations. 
Please adjust the resource quota manually.";
 
+    public static final String SCALING_VETOED_REASON = "ScalingVetoed";
+    public static final String SCALING_APPROVED_REASON = "ScalingApproved";
+
     private static final Logger LOG = 
LoggerFactory.getLogger(ScalingExecutor.class);
 
     private final JobVertexScaler<KEY, Context> jobVertexScaler;
     private final AutoScalerEventHandler<KEY, Context> autoScalerEventHandler;
     private final AutoScalerStateStore<KEY, Context> autoScalerStateStore;
     private final ResourceCheck resourceCheck;
+    private final Collection<ScalingExecutorPlugin<KEY, Context>> 
discoveredScalingCustomExecutors;

Review Comment:
   Yes, that's right, I missed this one in my previous refinements. I fixed it 
now!



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -459,6 +493,144 @@ private boolean checkIfBlockedAndTriggerScalingEvent(
         return !scaleEnabled || isExcluded;
     }
 
+    /**
+     * Apply pluggable scaling custom executors (SPI extension point).
+     *
+     * <p>The chain to apply is resolved per reconciliation against the 
per-job Configuration,
+     * mirroring custom metric evaluator plugin flow.
+     */
+    private Map<JobVertexID, ScalingSummary> applyScalingExecutorPlugins(

Review Comment:
   Addressed!



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -459,6 +493,144 @@ private boolean checkIfBlockedAndTriggerScalingEvent(
         return !scaleEnabled || isExcluded;
     }
 
+    /**
+     * Apply pluggable scaling custom executors (SPI extension point).
+     *
+     * <p>The chain to apply is resolved per reconciliation against the 
per-job Configuration,
+     * mirroring custom metric evaluator plugin flow.
+     */
+    private Map<JobVertexID, ScalingSummary> applyScalingExecutorPlugins(
+            Context context,
+            Configuration conf,
+            EvaluatedMetrics evaluatedMetrics,
+            JobTopology jobTopology,
+            Map<JobVertexID, ScalingSummary> scalingSummaries) {
+        Collection<Map.Entry<String, ScalingExecutorPlugin<KEY, Context>>> 
chain =
+                resolveCustomScalingExecutors(conf);
+        if (chain.isEmpty()) {
+            return scalingSummaries;
+        }
+        for (Map.Entry<String, ScalingExecutorPlugin<KEY, Context>> entry : 
chain) {
+            var instanceName = entry.getKey();
+            var plugin = entry.getValue();
+            var pluginClassName = plugin.getClass().getName();
+            var pluginConfiguration =
+                    AutoScalerOptions.customScalingExecutorConfiguration(conf, 
instanceName);
+            var pluginContext =
+                    new ScalingExecutorPlugin.Context<>(
+                            context, pluginConfiguration, evaluatedMetrics, 
jobTopology);
+            scalingSummaries = plugin.apply(pluginContext, scalingSummaries);
+            if (scalingSummaries == null || scalingSummaries.isEmpty()) {
+                LOG.info(
+                        "Scaling vetoed by scaling custom executor instance 
'{}' (class={}).",
+                        instanceName,
+                        pluginClassName);
+                autoScalerEventHandler.handleEvent(
+                        context,
+                        AutoScalerEventHandler.Type.Warning,
+                        SCALING_VETOED_REASON,
+                        String.format(
+                                "Scaling decision vetoed by scaling custom 
executor instance '%s' (class=%s).",
+                                instanceName, pluginClassName),
+                        SCALING_VETOED_REASON + ":" + instanceName,
+                        conf.get(SCALING_EVENT_INTERVAL));
+                return null;
+            }
+            LOG.info(
+                    "Scaling decision approved by scaling custom executor 
instance '{}' (class={}) with summaries: {}",
+                    instanceName,
+                    pluginClassName,
+                    scalingSummaries.values());
+            autoScalerEventHandler.handleEvent(
+                    context,
+                    AutoScalerEventHandler.Type.Normal,
+                    SCALING_APPROVED_REASON,

Review Comment:
   I removed it!



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -459,6 +493,144 @@ private boolean checkIfBlockedAndTriggerScalingEvent(
         return !scaleEnabled || isExcluded;
     }
 
+    /**
+     * Apply pluggable scaling custom executors (SPI extension point).
+     *
+     * <p>The chain to apply is resolved per reconciliation against the 
per-job Configuration,
+     * mirroring custom metric evaluator plugin flow.
+     */
+    private Map<JobVertexID, ScalingSummary> applyScalingExecutorPlugins(
+            Context context,
+            Configuration conf,
+            EvaluatedMetrics evaluatedMetrics,
+            JobTopology jobTopology,
+            Map<JobVertexID, ScalingSummary> scalingSummaries) {
+        Collection<Map.Entry<String, ScalingExecutorPlugin<KEY, Context>>> 
chain =
+                resolveCustomScalingExecutors(conf);
+        if (chain.isEmpty()) {
+            return scalingSummaries;
+        }
+        for (Map.Entry<String, ScalingExecutorPlugin<KEY, Context>> entry : 
chain) {
+            var instanceName = entry.getKey();
+            var plugin = entry.getValue();
+            var pluginClassName = plugin.getClass().getName();
+            var pluginConfiguration =
+                    AutoScalerOptions.customScalingExecutorConfiguration(conf, 
instanceName);
+            var pluginContext =
+                    new ScalingExecutorPlugin.Context<>(
+                            context, pluginConfiguration, evaluatedMetrics, 
jobTopology);
+            scalingSummaries = plugin.apply(pluginContext, scalingSummaries);
+            if (scalingSummaries == null || scalingSummaries.isEmpty()) {
+                LOG.info(
+                        "Scaling vetoed by scaling custom executor instance 
'{}' (class={}).",
+                        instanceName,
+                        pluginClassName);
+                autoScalerEventHandler.handleEvent(
+                        context,
+                        AutoScalerEventHandler.Type.Warning,

Review Comment:
   Because it cancels entirely the scaling decision of the autoscaler, and I 
see it more like a warning than an info event.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -459,6 +493,144 @@ private boolean checkIfBlockedAndTriggerScalingEvent(
         return !scaleEnabled || isExcluded;
     }
 
+    /**
+     * Apply pluggable scaling custom executors (SPI extension point).
+     *
+     * <p>The chain to apply is resolved per reconciliation against the 
per-job Configuration,
+     * mirroring custom metric evaluator plugin flow.
+     */
+    private Map<JobVertexID, ScalingSummary> applyScalingExecutorPlugins(
+            Context context,
+            Configuration conf,
+            EvaluatedMetrics evaluatedMetrics,
+            JobTopology jobTopology,
+            Map<JobVertexID, ScalingSummary> scalingSummaries) {
+        Collection<Map.Entry<String, ScalingExecutorPlugin<KEY, Context>>> 
chain =
+                resolveCustomScalingExecutors(conf);
+        if (chain.isEmpty()) {
+            return scalingSummaries;
+        }
+        for (Map.Entry<String, ScalingExecutorPlugin<KEY, Context>> entry : 
chain) {
+            var instanceName = entry.getKey();
+            var plugin = entry.getValue();
+            var pluginClassName = plugin.getClass().getName();
+            var pluginConfiguration =
+                    AutoScalerOptions.customScalingExecutorConfiguration(conf, 
instanceName);
+            var pluginContext =
+                    new ScalingExecutorPlugin.Context<>(
+                            context, pluginConfiguration, evaluatedMetrics, 
jobTopology);
+            scalingSummaries = plugin.apply(pluginContext, scalingSummaries);
+            if (scalingSummaries == null || scalingSummaries.isEmpty()) {
+                LOG.info(
+                        "Scaling vetoed by scaling custom executor instance 
'{}' (class={}).",
+                        instanceName,
+                        pluginClassName);
+                autoScalerEventHandler.handleEvent(
+                        context,
+                        AutoScalerEventHandler.Type.Warning,
+                        SCALING_VETOED_REASON,
+                        String.format(
+                                "Scaling decision vetoed by scaling custom 
executor instance '%s' (class=%s).",
+                                instanceName, pluginClassName),
+                        SCALING_VETOED_REASON + ":" + instanceName,
+                        conf.get(SCALING_EVENT_INTERVAL));
+                return null;
+            }
+            LOG.info(
+                    "Scaling decision approved by scaling custom executor 
instance '{}' (class={}) with summaries: {}",

Review Comment:
   Right, I removed the log and the event and let a single INFO log message 
instead only when the scalingSummaries differs. I also updated the placement of 
the general scaling event to be placed after the memory tuning and custom 
executors are applied.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutorPlugin.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import lombok.Value;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * A pluggable plugin that allows users to intercept, modify, or reject 
computed scaling decisions
+ * before they are applied. Implementations are invoked in the {@link 
ScalingExecutor} after scaling
+ * summaries have been computed and the blocked check has passed, but before 
the actual application
+ * of parallelism overrides.
+ *
+ * <p>Multiple plugin implementations can be chained. Each plugin receives the 
(possibly modified)
+ * output of the previous plugin. If any plugin returns an empty map, the 
scaling operation is
+ * vetoed entirely.
+ *
+ * <p>This was introduced as part of <a
+ * 
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-575%3A+Scaling+Executor+Plugin+SPI+for+Flink+Autoscaler";>FLIP-575:
+ * Scaling Executor Plugin SPI for Flink Autoscaler</a> and is complementary 
to <a
+ * 
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-514%3A+Custom+Evaluator+plugin+for+Flink+Autoscaler";>FLIP-514:
+ * Custom Evaluator plugin for Flink Autoscaler</a> which provides 
extensibility at the metric
+ * evaluation layer.
+ *
+ * <p>Implementations are discovered via Java's {@link 
java.util.ServiceLoader} mechanism. To
+ * register a custom plugin, add the fully qualified class name of the 
implementation to {@code
+ * META-INF/services/org.apache.flink.autoscaler.ScalingExecutorPlugin}.
+ *
+ * @param <KEY> The job key.
+ * @param <CTX> Instance of {@link JobAutoScalerContext}.
+ */
+public interface ScalingExecutorPlugin<KEY, CTX extends 
JobAutoScalerContext<KEY>> {
+
+    /**
+     * Returns the priority of this plugin in the chain. Plugins with lower 
priority values are
+     * executed first. The default priority is 0. Plugins with equal priority 
have no guaranteed
+     * relative ordering.
+     *
+     * <p>Example usage: a resource-gating plugin (priority -100) should run 
before a
+     * parallelism-alignment plugin (priority 0), which should run before a 
cost-cap plugin
+     * (priority 100).
+     *
+     * @return the priority value; lower values execute first.
+     */
+    default int priority() {
+        return 0;
+    }
+
+    /**
+     * Applies the plugin logic to the computed scaling summaries before they 
are executed.
+     *
+     * <p>Implementations can:
+     *
+     * <ul>
+     *   <li>Return the summaries unmodified to approve the scaling decision 
as-is.
+     *   <li>Return a modified map (e.g., with adjusted parallelism values) to 
alter the decision.
+     *   <li>Return an empty map (via {@link Collections#emptyMap()}) to 
veto/reject the scaling
+     *       operation entirely.
+     * </ul>
+     *
+     * @param context The plugin context wrapping the autoscaler context, 
configuration, evaluated
+     *     metrics, and job topology.
+     * @param scalingSummaries The computed scaling summaries keyed by vertex 
ID. This map contains
+     *     only vertices that require a parallelism change.
+     * @return The (possibly modified) scaling summaries to apply, or an empty 
map to veto the
+     *     scaling.
+     */
+    Map<JobVertexID, ScalingSummary> apply(
+            Context<KEY, CTX> context, Map<JobVertexID, ScalingSummary> 
scalingSummaries);
+
+    /**
+     * Immutable context wrapping all inputs available to the scaling executor 
plugin, providing
+     * access to the autoscaler context, configuration, evaluated metrics, and 
job topology.
+     *
+     * @param <KEY> The job key.
+     * @param <CTX> Instance of {@link JobAutoScalerContext}.
+     */
+    @Value
+    class Context<KEY, CTX extends JobAutoScalerContext<KEY>> {
+
+        /** The autoscaler context for the current job. */
+        CTX autoScalerContext;

Review Comment:
   I'd keep the wrapped `autoScalerContext` field rather than extend it. By not 
extending it you have the power of keeping generic and make it applicable for 
both operator autoscaler and standalone autoscaler. By extending it will 
introduce too much of unnecessary complexity.
   
   Also, by not extending the CTX is better from the point of view of 
maintenance for every new change that the CTX may introduce.
   
   On doing the same in the evaluator plugin, I'd leave it. The evaluator runs 
at a different phase (metric evaluation) and the `ScalingMetricEvaluator` 
doesn't even receive the actual autoscaler context from `JobAutoScalerImpl`.
   



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutorPlugin.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import lombok.Value;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * A pluggable plugin that allows users to intercept, modify, or reject 
computed scaling decisions
+ * before they are applied. Implementations are invoked in the {@link 
ScalingExecutor} after scaling
+ * summaries have been computed and the blocked check has passed, but before 
the actual application
+ * of parallelism overrides.
+ *
+ * <p>Multiple plugin implementations can be chained. Each plugin receives the 
(possibly modified)
+ * output of the previous plugin. If any plugin returns an empty map, the 
scaling operation is
+ * vetoed entirely.
+ *
+ * <p>This was introduced as part of <a
+ * 
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-575%3A+Scaling+Executor+Plugin+SPI+for+Flink+Autoscaler";>FLIP-575:
+ * Scaling Executor Plugin SPI for Flink Autoscaler</a> and is complementary 
to <a
+ * 
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-514%3A+Custom+Evaluator+plugin+for+Flink+Autoscaler";>FLIP-514:
+ * Custom Evaluator plugin for Flink Autoscaler</a> which provides 
extensibility at the metric
+ * evaluation layer.
+ *
+ * <p>Implementations are discovered via Java's {@link 
java.util.ServiceLoader} mechanism. To
+ * register a custom plugin, add the fully qualified class name of the 
implementation to {@code
+ * META-INF/services/org.apache.flink.autoscaler.ScalingExecutorPlugin}.
+ *
+ * @param <KEY> The job key.
+ * @param <CTX> Instance of {@link JobAutoScalerContext}.
+ */
+public interface ScalingExecutorPlugin<KEY, CTX extends 
JobAutoScalerContext<KEY>> {
+
+    /**
+     * Returns the priority of this plugin in the chain. Plugins with lower 
priority values are
+     * executed first. The default priority is 0. Plugins with equal priority 
have no guaranteed
+     * relative ordering.
+     *
+     * <p>Example usage: a resource-gating plugin (priority -100) should run 
before a
+     * parallelism-alignment plugin (priority 0), which should run before a 
cost-cap plugin
+     * (priority 100).
+     *
+     * @return the priority value; lower values execute first.
+     */
+    default int priority() {
+        return 0;
+    }
+
+    /**
+     * Applies the plugin logic to the computed scaling summaries before they 
are executed.
+     *
+     * <p>Implementations can:
+     *
+     * <ul>
+     *   <li>Return the summaries unmodified to approve the scaling decision 
as-is.
+     *   <li>Return a modified map (e.g., with adjusted parallelism values) to 
alter the decision.
+     *   <li>Return an empty map (via {@link Collections#emptyMap()}) to 
veto/reject the scaling
+     *       operation entirely.
+     * </ul>
+     *
+     * @param context The plugin context wrapping the autoscaler context, 
configuration, evaluated
+     *     metrics, and job topology.
+     * @param scalingSummaries The computed scaling summaries keyed by vertex 
ID. This map contains
+     *     only vertices that require a parallelism change.
+     * @return The (possibly modified) scaling summaries to apply, or an empty 
map to veto the
+     *     scaling.
+     */
+    Map<JobVertexID, ScalingSummary> apply(
+            Context<KEY, CTX> context, Map<JobVertexID, ScalingSummary> 
scalingSummaries);
+
+    /**
+     * Immutable context wrapping all inputs available to the scaling executor 
plugin, providing
+     * access to the autoscaler context, configuration, evaluated metrics, and 
job topology.
+     *
+     * @param <KEY> The job key.
+     * @param <CTX> Instance of {@link JobAutoScalerContext}.
+     */
+    @Value
+    class Context<KEY, CTX extends JobAutoScalerContext<KEY>> {
+
+        /** The autoscaler context for the current job. */
+        CTX autoScalerContext;
+
+        /**
+         * The prefix-stripped, per-plugin configuration for this invocation, 
populated from {@code
+         * job.autoscaler.scaling.custom-executor.<name>.<parameter>} entries 
(with the {@code
+         * kubernetes.operator.} legacy fallback honored). Never {@code null}; 
empty when the plugin
+         * has no per-instance options configured.
+         */
+        Configuration configuration;

Review Comment:
   This field isn't the job configuration that autoscaler context contains, 
it's the per-instance, prefix-stripped plugin options, built from 
`job.autoscaler.scaling.custom-executor.<name>.<parameter>`. It's intentionally 
distinct from `autoScalerContext.getConfiguration()`, which is the shared 
job-level config.
   
   It has to be instance-scoped because the executor supports multiple 
instances of the same plugin class registered under different `<name>`s, each 
receiving its own options.
   
   I considered merging it into the job config (as the evaluator plugin does), 
but the evaluator is single-instance, so a merged view is unambiguous there. 
For a multi-instance chain I'd rather keep the Configuration explicit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to