ctrlaltdilj commented on code in PR #1020:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/1020#discussion_r2298814205


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java:
##########
@@ -22,31 +22,59 @@
 import org.apache.flink.autoscaler.RestApiMetricsCollector;
 import org.apache.flink.autoscaler.ScalingExecutor;
 import org.apache.flink.autoscaler.ScalingMetricEvaluator;
+import org.apache.flink.autoscaler.realizer.FlinkAutoscalerScalingRealizer;
+import org.apache.flink.autoscaler.realizer.ScalingRealizer;
 import org.apache.flink.kubernetes.operator.autoscaler.state.ConfigMapStore;
 import 
org.apache.flink.kubernetes.operator.autoscaler.state.KubernetesAutoScalerStateStore;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.resources.ClusterResourceManager;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.PluginDiscoveryUtils;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
 
 /** The factory of {@link JobAutoScaler}. */
 public class AutoscalerFactory {
 
+    private static Logger LOG = 
LoggerFactory.getLogger(AutoscalerFactory.class);
+
     public static JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext> 
create(
             KubernetesClient client,
             EventRecorder eventRecorder,
-            ClusterResourceManager clusterResourceManager) {
+            ClusterResourceManager clusterResourceManager,
+            FlinkConfigManager configManager) {
 
         var stateStore = new KubernetesAutoScalerStateStore(new 
ConfigMapStore(client));
         var eventHandler = new KubernetesAutoScalerEventHandler(eventRecorder);
 
+        Set<FlinkAutoscalerScalingRealizer> flinkAutoscalerScalingRealizers =
+                PluginDiscoveryUtils.discoverResources(
+                        configManager, FlinkAutoscalerScalingRealizer.class);
+
+        flinkAutoscalerScalingRealizers.forEach(
+                realizer -> {
+                    LOG.info(
+                            "Discovered resource from plugin directory {}",
+                            realizer.getClass().getName());
+                });
+
+        ScalingRealizer scalingRealizer = new KubernetesScalingRealizer();
+
+        if (!flinkAutoscalerScalingRealizers.isEmpty()) {
+            scalingRealizer = 
flinkAutoscalerScalingRealizers.stream().findFirst().get();

Review Comment:
   Uses the scaling realizer from plugins



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to