This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 4408c6658cf1b3dc21c7b3362720eb5879760696
Author: Gyula Fora <[email protected]>
AuthorDate: Thu Dec 1 08:38:14 2022 +0100

    [FLINK-30260][autoscaler] Integrate autoscaler components with reconciler 
mechanism
---
 docs/content/docs/custom-resource/reference.md        |  1 +
 .../kubernetes/operator/api/spec/FlinkVersion.java    |  3 ++-
 .../flink/kubernetes/operator/FlinkOperator.java      | 14 ++++++++++++--
 .../deployment/AbstractFlinkResourceReconciler.java   | 13 +++++++++++--
 .../reconciler/deployment/AbstractJobReconciler.java  |  6 ++++--
 .../reconciler/deployment/ApplicationReconciler.java  |  6 ++++--
 .../reconciler/deployment/ReconcilerFactory.java      | 12 +++++++++---
 .../reconciler/deployment/SessionReconciler.java      |  6 ++++--
 .../reconciler/sessionjob/SessionJobReconciler.java   |  6 ++++--
 .../operator/service/AbstractFlinkService.java        |  4 ++--
 .../kubernetes/operator/service/FlinkService.java     |  3 +++
 .../operator/utils/KubernetesClientUtils.java         | 19 +++++++++++++++++++
 .../kubernetes/operator/TestingFlinkService.java      |  2 +-
 .../controller/TestingFlinkDeploymentController.java  |  5 ++++-
 .../controller/TestingFlinkSessionJobController.java  |  5 ++++-
 .../sessionjob/FlinkSessionJobObserverTest.java       |  3 ++-
 .../deployment/ApplicationReconcilerTest.java         |  3 ++-
 .../ApplicationReconcilerUpgradeModeTest.java         |  3 ++-
 .../reconciler/deployment/SessionReconcilerTest.java  |  9 ++++++---
 .../sessionjob/SessionJobReconcilerTest.java          | 12 +++++++++---
 .../operator/service/NativeFlinkServiceTest.java      |  2 +-
 .../crds/flinkdeployments.flink.apache.org-v1.yml     |  1 +
 pom.xml                                               |  1 +
 23 files changed, 108 insertions(+), 31 deletions(-)

diff --git a/docs/content/docs/custom-resource/reference.md 
b/docs/content/docs/custom-resource/reference.md
index 8977e5e6..eaf4076c 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -82,6 +82,7 @@ This page serves as a full reference for FlinkDeployment 
custom resource definit
 | v1_14 |  |
 | v1_15 |  |
 | v1_16 |  |
+| v1_17 |  |
 
 ### IngressSpec
 **Class**: org.apache.flink.kubernetes.operator.api.spec.IngressSpec
diff --git 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
index 8b388361..951e4514 100644
--- 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
+++ 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
@@ -26,7 +26,8 @@ public enum FlinkVersion {
     v1_13,
     v1_14,
     v1_15,
-    v1_16;
+    v1_16,
+    v1_17;
 
     public boolean isNewerVersionThan(FlinkVersion otherVersion) {
         return this.ordinal() > otherVersion.ordinal();
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index c6362cfb..adaa1562 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -143,7 +143,12 @@ public class FlinkOperator {
         var eventRecorder = EventRecorder.create(client, listeners);
         var reconcilerFactory =
                 new ReconcilerFactory(
-                        client, flinkServiceFactory, configManager, 
eventRecorder, statusRecorder);
+                        client,
+                        flinkServiceFactory,
+                        configManager,
+                        eventRecorder,
+                        statusRecorder,
+                        metricGroup);
         var observerFactory =
                 new FlinkDeploymentObserverFactory(
                         flinkServiceFactory, configManager, statusRecorder, 
eventRecorder);
@@ -167,7 +172,12 @@ public class FlinkOperator {
         var statusRecorder = StatusRecorder.create(client, metricManager, 
listeners);
         var reconciler =
                 new SessionJobReconciler(
-                        client, flinkServiceFactory, configManager, 
eventRecorder, statusRecorder);
+                        client,
+                        flinkServiceFactory,
+                        configManager,
+                        eventRecorder,
+                        statusRecorder,
+                        metricGroup);
         var observer =
                 new FlinkSessionJobObserver(flinkServiceFactory, 
configManager, eventRecorder);
         var controller =
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index ea6a3048..94d154a1 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -30,8 +30,10 @@ import 
org.apache.flink.kubernetes.operator.api.spec.JobState;
 import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
 import 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.autoscaler.JobAutoScaler;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import 
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
 import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import 
org.apache.flink.kubernetes.operator.reconciler.diff.ReflectiveDiffBuilder;
@@ -72,6 +74,7 @@ public abstract class AbstractFlinkResourceReconciler<
     protected final EventRecorder eventRecorder;
     protected final StatusRecorder<CR, STATUS> statusRecorder;
     protected final KubernetesClient kubernetesClient;
+    protected final JobAutoScaler resourceScaler;
 
     public static final String MSG_SUSPENDED = "Suspending existing 
deployment.";
     public static final String MSG_SPEC_CHANGED =
@@ -85,11 +88,14 @@ public abstract class AbstractFlinkResourceReconciler<
             KubernetesClient kubernetesClient,
             FlinkConfigManager configManager,
             EventRecorder eventRecorder,
-            StatusRecorder<CR, STATUS> statusRecorder) {
+            StatusRecorder<CR, STATUS> statusRecorder,
+            KubernetesOperatorMetricGroup operatorMetricGroup) {
         this.kubernetesClient = kubernetesClient;
         this.configManager = configManager;
         this.eventRecorder = eventRecorder;
         this.statusRecorder = statusRecorder;
+        this.resourceScaler =
+                JobAutoScaler.create(kubernetesClient, configManager, 
operatorMetricGroup);
     }
 
     @Override
@@ -176,7 +182,9 @@ public abstract class AbstractFlinkResourceReconciler<
                     MSG_ROLLBACK);
             rollback(cr, ctx, observeConfig);
         } else if (!reconcileOtherChanges(cr, ctx, observeConfig)) {
-            LOG.info("Resource fully reconciled, nothing to do...");
+            if (!resourceScaler.scale(cr, flinkService, observeConfig, ctx)) {
+                LOG.info("Resource fully reconciled, nothing to do...");
+            }
         }
     }
 
@@ -253,6 +261,7 @@ public abstract class AbstractFlinkResourceReconciler<
 
     @Override
     public final DeleteControl cleanup(CR resource, Context<?> context) {
+        resourceScaler.cleanup(resource);
         return cleanupInternal(resource, context);
     }
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 3b777e4e..33b28413 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.kubernetes.operator.api.status.JobStatus;
 import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import 
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
@@ -58,8 +59,9 @@ public abstract class AbstractJobReconciler<
             KubernetesClient kubernetesClient,
             FlinkConfigManager configManager,
             EventRecorder eventRecorder,
-            StatusRecorder<CR, STATUS> statusRecorder) {
-        super(kubernetesClient, configManager, eventRecorder, statusRecorder);
+            StatusRecorder<CR, STATUS> statusRecorder,
+            KubernetesOperatorMetricGroup operatorMetricGroup) {
+        super(kubernetesClient, configManager, eventRecorder, statusRecorder, 
operatorMetricGroup);
     }
 
     @Override
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index ee40da92..54463dc6 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.exception.RecoveryFailureException;
 import org.apache.flink.kubernetes.operator.health.ClusterHealthInfo;
+import 
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
 import org.apache.flink.kubernetes.operator.observer.ClusterHealthEvaluator;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
@@ -69,8 +70,9 @@ public class ApplicationReconciler
             FlinkService flinkService,
             FlinkConfigManager configManager,
             EventRecorder eventRecorder,
-            StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> 
statusRecorder) {
-        super(kubernetesClient, configManager, eventRecorder, statusRecorder);
+            StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> 
statusRecorder,
+            KubernetesOperatorMetricGroup operatorMetricGroup) {
+        super(kubernetesClient, configManager, eventRecorder, statusRecorder, 
operatorMetricGroup);
         this.flinkService = flinkService;
     }
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
index bcca7fd1..d5a852b5 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
 import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.config.Mode;
+import 
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
 import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
 import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
@@ -41,6 +42,7 @@ public class ReconcilerFactory {
     private final FlinkConfigManager configManager;
     private final EventRecorder eventRecorder;
     private final StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> 
deploymentStatusRecorder;
+    private final KubernetesOperatorMetricGroup operatorMetricGroup;
     private final Map<Tuple2<Mode, KubernetesDeploymentMode>, 
Reconciler<FlinkDeployment>>
             reconcilerMap;
 
@@ -49,12 +51,14 @@ public class ReconcilerFactory {
             FlinkServiceFactory flinkServiceFactory,
             FlinkConfigManager configManager,
             EventRecorder eventRecorder,
-            StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> 
deploymentStatusRecorder) {
+            StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> 
deploymentStatusRecorder,
+            KubernetesOperatorMetricGroup operatorMetricGroup) {
         this.kubernetesClient = kubernetesClient;
         this.flinkServiceFactory = flinkServiceFactory;
         this.configManager = configManager;
         this.eventRecorder = eventRecorder;
         this.deploymentStatusRecorder = deploymentStatusRecorder;
+        this.operatorMetricGroup = operatorMetricGroup;
         this.reconcilerMap = new ConcurrentHashMap<>();
     }
 
@@ -71,14 +75,16 @@ public class ReconcilerFactory {
                                     flinkServiceFactory.getOrCreate(flinkApp),
                                     configManager,
                                     eventRecorder,
-                                    deploymentStatusRecorder);
+                                    deploymentStatusRecorder,
+                                    operatorMetricGroup);
                         case APPLICATION:
                             return new ApplicationReconciler(
                                     kubernetesClient,
                                     flinkServiceFactory.getOrCreate(flinkApp),
                                     configManager,
                                     eventRecorder,
-                                    deploymentStatusRecorder);
+                                    deploymentStatusRecorder,
+                                    operatorMetricGroup);
                         default:
                             throw new UnsupportedOperationException(
                                     String.format("Unsupported running mode: 
%s", modes.f0));
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index 20f945e6..24e7ecea 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatu
 import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.api.status.ReconciliationStatus;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import 
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
@@ -61,8 +62,9 @@ public class SessionReconciler
             FlinkService flinkService,
             FlinkConfigManager configManager,
             EventRecorder eventRecorder,
-            StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> 
statusRecorder) {
-        super(kubernetesClient, configManager, eventRecorder, statusRecorder);
+            StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> 
statusRecorder,
+            KubernetesOperatorMetricGroup operatorMetricGroup) {
+        super(kubernetesClient, configManager, eventRecorder, statusRecorder, 
operatorMetricGroup);
         this.flinkService = flinkService;
     }
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
index ad5c5735..1c1de979 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
 import 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import 
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
 import 
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
@@ -52,8 +53,9 @@ public class SessionJobReconciler
             FlinkServiceFactory flinkServiceFactory,
             FlinkConfigManager configManager,
             EventRecorder eventRecorder,
-            StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus> 
statusRecorder) {
-        super(kubernetesClient, configManager, eventRecorder, statusRecorder);
+            StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus> 
statusRecorder,
+            KubernetesOperatorMetricGroup operatorMetricGroup) {
+        super(kubernetesClient, configManager, eventRecorder, statusRecorder, 
operatorMetricGroup);
         this.flinkServiceFactory = flinkServiceFactory;
     }
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index fdf6991e..933ab745 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -647,8 +647,8 @@ public abstract class AbstractFlinkService implements 
FlinkService {
                         .toSeconds());
     }
 
-    @VisibleForTesting
-    protected ClusterClient<String> getClusterClient(Configuration conf) 
throws Exception {
+    @Override
+    public ClusterClient<String> getClusterClient(Configuration conf) throws 
Exception {
         final String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID);
         final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE);
         final int port = conf.getInteger(RestOptions.PORT);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index ae3fb25b..6c75d127 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.service;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
@@ -102,4 +103,6 @@ public interface FlinkService {
 
     Map<String, String> getMetrics(Configuration conf, String jobId, 
List<String> metricNames)
             throws Exception;
+
+    ClusterClient<String> getClusterClient(Configuration conf) throws 
Exception;
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesClientUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesClientUtils.java
index 119a3db8..46002af5 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesClientUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesClientUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.kubernetes.operator.utils;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.metrics.KubernetesClientMetrics;
 import org.apache.flink.metrics.MetricGroup;
@@ -28,10 +29,14 @@ import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.KubernetesClientBuilder;
 import io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory;
 import okhttp3.OkHttpClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Kubernetes client utils. */
 public class KubernetesClientUtils {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesClientUtils.class);
+
     public static KubernetesClient getKubernetesClient(
             FlinkOperatorConfiguration operatorConfig, MetricGroup 
metricGroup) {
         return getKubernetesClient(operatorConfig, metricGroup, null);
@@ -63,4 +68,18 @@ public class KubernetesClientUtils {
 
         return clientBuilder.build();
     }
+
+    public static void replaceSpecAfterScaling(
+            KubernetesClient kubernetesClient, AbstractFlinkResource<?, ?> cr) 
{
+        var inKube = kubernetesClient.resource(cr).get();
+
+        if (cr.getMetadata().getGeneration() == 
inKube.getMetadata().getGeneration()) {
+            kubernetesClient
+                    .resource(cr)
+                    
.lockResourceVersion(inKube.getMetadata().getResourceVersion())
+                    .replace();
+        } else {
+            LOG.info("Spec already upgrading in kube, skipping scale 
operation.");
+        }
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 9f654409..22efa1f0 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -286,7 +286,7 @@ public class TestingFlinkService extends 
AbstractFlinkService {
     }
 
     @Override
-    protected ClusterClient<String> getClusterClient(Configuration config) 
throws Exception {
+    public ClusterClient<String> getClusterClient(Configuration config) throws 
Exception {
         TestingClusterClient<String> clusterClient = new 
TestingClusterClient<>(config);
         FlinkVersion flinkVersion = 
config.get(FlinkConfigBuilder.FLINK_VERSION);
         clusterClient.setListJobsFunction(
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
index 19a924db..1a7151cc 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.kubernetes.operator.controller;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.TestingFlinkServiceFactory;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
@@ -82,7 +84,8 @@ public class TestingFlinkDeploymentController
                                 flinkServiceFactory,
                                 configManager,
                                 eventRecorder,
-                                statusRecorder),
+                                statusRecorder,
+                                TestUtils.createTestMetricGroup(new 
Configuration())),
                         new FlinkDeploymentObserverFactory(
                                 flinkServiceFactory, configManager, 
statusRecorder, eventRecorder),
                         statusRecorder,
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
index 116e147f..e80374ee 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.kubernetes.operator.controller;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.TestingFlinkServiceFactory;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
@@ -82,7 +84,8 @@ public class TestingFlinkSessionJobController
                                 flinkServiceFactory,
                                 configManager,
                                 eventRecorder,
-                                statusRecorder),
+                                statusRecorder,
+                                TestUtils.createTestMetricGroup(new 
Configuration())),
                         new FlinkSessionJobObserver(
                                 flinkServiceFactory, configManager, 
eventRecorder),
                         statusRecorder,
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
index 9c426861..d9c057d7 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
@@ -79,7 +79,8 @@ public class FlinkSessionJobObserverTest {
                         flinkServiceFactory,
                         configManager,
                         eventRecorder,
-                        statusRecorder);
+                        statusRecorder,
+                        TestUtils.createTestMetricGroup(new Configuration()));
     }
 
     @Test
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 6c2f6ee2..a4f4acbc 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -112,7 +112,8 @@ public class ApplicationReconcilerTest {
                         flinkService,
                         configManager,
                         eventRecorder,
-                        statusRecorder);
+                        statusRecorder,
+                        TestUtils.createTestMetricGroup(new Configuration()));
     }
 
     @ParameterizedTest
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
index 2445741f..0e8549a0 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
@@ -80,7 +80,8 @@ public class ApplicationReconcilerUpgradeModeTest {
                         flinkService,
                         configManager,
                         eventRecorder,
-                        statusRecoder);
+                        statusRecoder,
+                        TestUtils.createTestMetricGroup(new Configuration()));
     }
 
     @ParameterizedTest
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
index f5ee07b7..1b0b6b07 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
@@ -80,7 +80,8 @@ public class SessionReconcilerTest {
                         flinkService,
                         configManager,
                         eventRecorder,
-                        new TestingStatusRecorder<>());
+                        new TestingStatusRecorder<>(),
+                        TestUtils.createTestMetricGroup(new Configuration()));
         FlinkDeployment deployment = TestUtils.buildSessionCluster();
         kubernetesClient.resource(deployment).createOrReplace();
         reconciler.reconcile(deployment, flinkService.getContext());
@@ -96,7 +97,8 @@ public class SessionReconcilerTest {
                         flinkService,
                         configManager,
                         eventRecorder,
-                        new TestingStatusRecorder<>());
+                        new TestingStatusRecorder<>(),
+                        TestUtils.createTestMetricGroup(new Configuration()));
 
         FlinkDeployment deployment = TestUtils.buildSessionCluster();
         kubernetesClient.resource(deployment).createOrReplace();
@@ -145,7 +147,8 @@ public class SessionReconcilerTest {
                         flinkService,
                         configManager,
                         eventRecorder,
-                        new TestingStatusRecorder<>());
+                        new TestingStatusRecorder<>(),
+                        TestUtils.createTestMetricGroup(new Configuration()));
 
         FlinkDeployment flinkApp = TestUtils.buildApplicationCluster();
         ObjectMeta deployMeta = flinkApp.getMetadata();
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
index b9309c9f..f6d8e81f 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
@@ -107,14 +107,15 @@ public class SessionJobReconcilerTest {
                     }
                 };
         statusRecoder = new TestingStatusRecorder<>();
+        
kubernetesClient.resource(TestUtils.buildSessionJob()).createOrReplace();
         reconciler =
                 new SessionJobReconciler(
                         kubernetesClient,
                         flinkServiceFactory,
                         configManager,
                         eventRecorder,
-                        statusRecoder);
-        
kubernetesClient.resource(TestUtils.buildSessionJob()).createOrReplace();
+                        statusRecoder,
+                        TestUtils.createTestMetricGroup(new Configuration()));
     }
 
     @Test
@@ -610,7 +611,12 @@ public class SessionJobReconcilerTest {
         // Force upgrade when savepoint is in progress.
         reconciler =
                 new SessionJobReconciler(
-                        null, flinkServiceFactory, configManager, 
eventRecorder, statusRecoder);
+                        null,
+                        flinkServiceFactory,
+                        configManager,
+                        eventRecorder,
+                        statusRecoder,
+                        TestUtils.createTestMetricGroup(new Configuration()));
         spSessionJob.getSpec().getJob().setParallelism(100);
         reconciler.reconcile(spSessionJob, readyContext);
         assertEquals(
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
index 234a3e33..a0ba485c 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
@@ -502,7 +502,7 @@ public class NativeFlinkServiceTest {
     private FlinkService createFlinkService(ClusterClient<String> 
clusterClient) {
         return new NativeFlinkService(client, new 
FlinkConfigManager(configuration)) {
             @Override
-            protected ClusterClient<String> getClusterClient(Configuration 
config) {
+            public ClusterClient<String> getClusterClient(Configuration 
config) {
                 return clusterClient;
             }
         };
diff --git 
a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml 
b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index ef023346..5212526f 100644
--- 
a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ 
b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -40,6 +40,7 @@ spec:
                 - v1_14
                 - v1_15
                 - v1_16
+                - v1_17
                 type: string
               ingress:
                 properties:
diff --git a/pom.xml b/pom.xml
index d1de2e4d..f3576a7b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -58,6 +58,7 @@ under the License.
         <module>flink-kubernetes-docs</module>
         <module>examples/flink-sql-runner-example</module>
         <module>examples/kubernetes-client-examples</module>
+        <module>examples/autoscaling</module>
     </modules>
 
     <properties>


Reply via email to