ctrlaltdilj commented on code in PR #1020: URL: https://github.com/apache/flink-kubernetes-operator/pull/1020#discussion_r2298835783
########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java: ########## @@ -22,31 +22,59 @@ import org.apache.flink.autoscaler.RestApiMetricsCollector; import org.apache.flink.autoscaler.ScalingExecutor; import org.apache.flink.autoscaler.ScalingMetricEvaluator; +import org.apache.flink.autoscaler.realizer.FlinkAutoscalerScalingRealizer; +import org.apache.flink.autoscaler.realizer.ScalingRealizer; import org.apache.flink.kubernetes.operator.autoscaler.state.ConfigMapStore; import org.apache.flink.kubernetes.operator.autoscaler.state.KubernetesAutoScalerStateStore; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.resources.ClusterResourceManager; import org.apache.flink.kubernetes.operator.utils.EventRecorder; +import org.apache.flink.kubernetes.operator.utils.PluginDiscoveryUtils; import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.processing.event.ResourceID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Set; /** The factory of {@link JobAutoScaler}. */ public class AutoscalerFactory { + private static Logger LOG = LoggerFactory.getLogger(AutoscalerFactory.class); + public static JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext> create( KubernetesClient client, EventRecorder eventRecorder, - ClusterResourceManager clusterResourceManager) { + ClusterResourceManager clusterResourceManager, + FlinkConfigManager configManager) { var stateStore = new KubernetesAutoScalerStateStore(new ConfigMapStore(client)); var eventHandler = new KubernetesAutoScalerEventHandler(eventRecorder); + Set<FlinkAutoscalerScalingRealizer> flinkAutoscalerScalingRealizers = + PluginDiscoveryUtils.discoverResources( + configManager, FlinkAutoscalerScalingRealizer.class); + + flinkAutoscalerScalingRealizers.forEach( + realizer -> { + LOG.info( + "Discovered resource from plugin directory {}", + realizer.getClass().getName()); + }); + + ScalingRealizer scalingRealizer = new KubernetesScalingRealizer(); + + if (!flinkAutoscalerScalingRealizers.isEmpty()) { + scalingRealizer = flinkAutoscalerScalingRealizers.stream().findFirst().get(); Review Comment: this assumes 1 scaling realizer, I'll harden this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org