This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 4408c6658cf1b3dc21c7b3362720eb5879760696 Author: Gyula Fora <[email protected]> AuthorDate: Thu Dec 1 08:38:14 2022 +0100 [FLINK-30260][autoscaler] Integrate autoscaler components with reconciler mechanism --- docs/content/docs/custom-resource/reference.md | 1 + .../kubernetes/operator/api/spec/FlinkVersion.java | 3 ++- .../flink/kubernetes/operator/FlinkOperator.java | 14 ++++++++++++-- .../deployment/AbstractFlinkResourceReconciler.java | 13 +++++++++++-- .../reconciler/deployment/AbstractJobReconciler.java | 6 ++++-- .../reconciler/deployment/ApplicationReconciler.java | 6 ++++-- .../reconciler/deployment/ReconcilerFactory.java | 12 +++++++++--- .../reconciler/deployment/SessionReconciler.java | 6 ++++-- .../reconciler/sessionjob/SessionJobReconciler.java | 6 ++++-- .../operator/service/AbstractFlinkService.java | 4 ++-- .../kubernetes/operator/service/FlinkService.java | 3 +++ .../operator/utils/KubernetesClientUtils.java | 19 +++++++++++++++++++ .../kubernetes/operator/TestingFlinkService.java | 2 +- .../controller/TestingFlinkDeploymentController.java | 5 ++++- .../controller/TestingFlinkSessionJobController.java | 5 ++++- .../sessionjob/FlinkSessionJobObserverTest.java | 3 ++- .../deployment/ApplicationReconcilerTest.java | 3 ++- .../ApplicationReconcilerUpgradeModeTest.java | 3 ++- .../reconciler/deployment/SessionReconcilerTest.java | 9 ++++++--- .../sessionjob/SessionJobReconcilerTest.java | 12 +++++++++--- .../operator/service/NativeFlinkServiceTest.java | 2 +- .../crds/flinkdeployments.flink.apache.org-v1.yml | 1 + pom.xml | 1 + 23 files changed, 108 insertions(+), 31 deletions(-) diff --git a/docs/content/docs/custom-resource/reference.md b/docs/content/docs/custom-resource/reference.md index 8977e5e6..eaf4076c 100644 --- a/docs/content/docs/custom-resource/reference.md +++ b/docs/content/docs/custom-resource/reference.md @@ -82,6 +82,7 @@ This page serves as a full reference for FlinkDeployment custom resource definit | v1_14 | | | v1_15 | | | v1_16 | | +| v1_17 | | ### IngressSpec **Class**: org.apache.flink.kubernetes.operator.api.spec.IngressSpec diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java index 8b388361..951e4514 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java @@ -26,7 +26,8 @@ public enum FlinkVersion { v1_13, v1_14, v1_15, - v1_16; + v1_16, + v1_17; public boolean isNewerVersionThan(FlinkVersion otherVersion) { return this.ordinal() > otherVersion.ordinal(); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java index c6362cfb..adaa1562 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java @@ -143,7 +143,12 @@ public class FlinkOperator { var eventRecorder = EventRecorder.create(client, listeners); var reconcilerFactory = new ReconcilerFactory( - client, flinkServiceFactory, configManager, eventRecorder, statusRecorder); + client, + flinkServiceFactory, + configManager, + eventRecorder, + statusRecorder, + metricGroup); var observerFactory = new FlinkDeploymentObserverFactory( flinkServiceFactory, configManager, statusRecorder, eventRecorder); @@ -167,7 +172,12 @@ public class FlinkOperator { var statusRecorder = StatusRecorder.create(client, metricManager, listeners); var reconciler = new SessionJobReconciler( - client, flinkServiceFactory, configManager, eventRecorder, statusRecorder); + client, + flinkServiceFactory, + configManager, + eventRecorder, + statusRecorder, + metricGroup); var observer = new FlinkSessionJobObserver(flinkServiceFactory, configManager, eventRecorder); var controller = diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java index ea6a3048..94d154a1 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java @@ -30,8 +30,10 @@ import org.apache.flink.kubernetes.operator.api.spec.JobState; import org.apache.flink.kubernetes.operator.api.status.CommonStatus; import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; import org.apache.flink.kubernetes.operator.api.status.ReconciliationState; +import org.apache.flink.kubernetes.operator.autoscaler.JobAutoScaler; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; +import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup; import org.apache.flink.kubernetes.operator.reconciler.Reconciler; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.reconciler.diff.ReflectiveDiffBuilder; @@ -72,6 +74,7 @@ public abstract class AbstractFlinkResourceReconciler< protected final EventRecorder eventRecorder; protected final StatusRecorder<CR, STATUS> statusRecorder; protected final KubernetesClient kubernetesClient; + protected final JobAutoScaler resourceScaler; public static final String MSG_SUSPENDED = "Suspending existing deployment."; public static final String MSG_SPEC_CHANGED = @@ -85,11 +88,14 @@ public abstract class AbstractFlinkResourceReconciler< KubernetesClient kubernetesClient, FlinkConfigManager configManager, EventRecorder eventRecorder, - StatusRecorder<CR, STATUS> statusRecorder) { + StatusRecorder<CR, STATUS> statusRecorder, + KubernetesOperatorMetricGroup operatorMetricGroup) { this.kubernetesClient = kubernetesClient; this.configManager = configManager; this.eventRecorder = eventRecorder; this.statusRecorder = statusRecorder; + this.resourceScaler = + JobAutoScaler.create(kubernetesClient, configManager, operatorMetricGroup); } @Override @@ -176,7 +182,9 @@ public abstract class AbstractFlinkResourceReconciler< MSG_ROLLBACK); rollback(cr, ctx, observeConfig); } else if (!reconcileOtherChanges(cr, ctx, observeConfig)) { - LOG.info("Resource fully reconciled, nothing to do..."); + if (!resourceScaler.scale(cr, flinkService, observeConfig, ctx)) { + LOG.info("Resource fully reconciled, nothing to do..."); + } } } @@ -253,6 +261,7 @@ public abstract class AbstractFlinkResourceReconciler< @Override public final DeleteControl cleanup(CR resource, Context<?> context) { + resourceScaler.cleanup(resource); return cleanupInternal(resource, context); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java index 3b777e4e..33b28413 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java @@ -28,6 +28,7 @@ import org.apache.flink.kubernetes.operator.api.status.JobStatus; import org.apache.flink.kubernetes.operator.api.status.ReconciliationState; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; +import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.kubernetes.operator.utils.SavepointUtils; @@ -58,8 +59,9 @@ public abstract class AbstractJobReconciler< KubernetesClient kubernetesClient, FlinkConfigManager configManager, EventRecorder eventRecorder, - StatusRecorder<CR, STATUS> statusRecorder) { - super(kubernetesClient, configManager, eventRecorder, statusRecorder); + StatusRecorder<CR, STATUS> statusRecorder, + KubernetesOperatorMetricGroup operatorMetricGroup) { + super(kubernetesClient, configManager, eventRecorder, statusRecorder, operatorMetricGroup); } @Override diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java index ee40da92..54463dc6 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java @@ -31,6 +31,7 @@ import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; import org.apache.flink.kubernetes.operator.exception.RecoveryFailureException; import org.apache.flink.kubernetes.operator.health.ClusterHealthInfo; +import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup; import org.apache.flink.kubernetes.operator.observer.ClusterHealthEvaluator; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.service.FlinkService; @@ -69,8 +70,9 @@ public class ApplicationReconciler FlinkService flinkService, FlinkConfigManager configManager, EventRecorder eventRecorder, - StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder) { - super(kubernetesClient, configManager, eventRecorder, statusRecorder); + StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder, + KubernetesOperatorMetricGroup operatorMetricGroup) { + super(kubernetesClient, configManager, eventRecorder, statusRecorder, operatorMetricGroup); this.flinkService = flinkService; } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java index bcca7fd1..d5a852b5 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java @@ -23,6 +23,7 @@ import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode; import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.config.Mode; +import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup; import org.apache.flink.kubernetes.operator.reconciler.Reconciler; import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory; import org.apache.flink.kubernetes.operator.utils.EventRecorder; @@ -41,6 +42,7 @@ public class ReconcilerFactory { private final FlinkConfigManager configManager; private final EventRecorder eventRecorder; private final StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> deploymentStatusRecorder; + private final KubernetesOperatorMetricGroup operatorMetricGroup; private final Map<Tuple2<Mode, KubernetesDeploymentMode>, Reconciler<FlinkDeployment>> reconcilerMap; @@ -49,12 +51,14 @@ public class ReconcilerFactory { FlinkServiceFactory flinkServiceFactory, FlinkConfigManager configManager, EventRecorder eventRecorder, - StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> deploymentStatusRecorder) { + StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> deploymentStatusRecorder, + KubernetesOperatorMetricGroup operatorMetricGroup) { this.kubernetesClient = kubernetesClient; this.flinkServiceFactory = flinkServiceFactory; this.configManager = configManager; this.eventRecorder = eventRecorder; this.deploymentStatusRecorder = deploymentStatusRecorder; + this.operatorMetricGroup = operatorMetricGroup; this.reconcilerMap = new ConcurrentHashMap<>(); } @@ -71,14 +75,16 @@ public class ReconcilerFactory { flinkServiceFactory.getOrCreate(flinkApp), configManager, eventRecorder, - deploymentStatusRecorder); + deploymentStatusRecorder, + operatorMetricGroup); case APPLICATION: return new ApplicationReconciler( kubernetesClient, flinkServiceFactory.getOrCreate(flinkApp), configManager, eventRecorder, - deploymentStatusRecorder); + deploymentStatusRecorder, + operatorMetricGroup); default: throw new UnsupportedOperationException( String.format("Unsupported running mode: %s", modes.f0)); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java index 20f945e6..24e7ecea 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java @@ -27,6 +27,7 @@ import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatu import org.apache.flink.kubernetes.operator.api.status.ReconciliationState; import org.apache.flink.kubernetes.operator.api.status.ReconciliationStatus; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; +import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.kubernetes.operator.utils.EventRecorder; @@ -61,8 +62,9 @@ public class SessionReconciler FlinkService flinkService, FlinkConfigManager configManager, EventRecorder eventRecorder, - StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder) { - super(kubernetesClient, configManager, eventRecorder, statusRecorder); + StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder, + KubernetesOperatorMetricGroup operatorMetricGroup) { + super(kubernetesClient, configManager, eventRecorder, statusRecorder, operatorMetricGroup); this.flinkService = flinkService; } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java index ad5c5735..1c1de979 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java @@ -25,6 +25,7 @@ import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus; import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; +import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup; import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler; import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory; @@ -52,8 +53,9 @@ public class SessionJobReconciler FlinkServiceFactory flinkServiceFactory, FlinkConfigManager configManager, EventRecorder eventRecorder, - StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus> statusRecorder) { - super(kubernetesClient, configManager, eventRecorder, statusRecorder); + StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus> statusRecorder, + KubernetesOperatorMetricGroup operatorMetricGroup) { + super(kubernetesClient, configManager, eventRecorder, statusRecorder, operatorMetricGroup); this.flinkServiceFactory = flinkServiceFactory; } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index fdf6991e..933ab745 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -647,8 +647,8 @@ public abstract class AbstractFlinkService implements FlinkService { .toSeconds()); } - @VisibleForTesting - protected ClusterClient<String> getClusterClient(Configuration conf) throws Exception { + @Override + public ClusterClient<String> getClusterClient(Configuration conf) throws Exception { final String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID); final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE); final int port = conf.getInteger(RestOptions.PORT); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java index ae3fb25b..6c75d127 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java @@ -18,6 +18,7 @@ package org.apache.flink.kubernetes.operator.service; import org.apache.flink.api.common.JobID; +import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; @@ -102,4 +103,6 @@ public interface FlinkService { Map<String, String> getMetrics(Configuration conf, String jobId, List<String> metricNames) throws Exception; + + ClusterClient<String> getClusterClient(Configuration conf) throws Exception; } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesClientUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesClientUtils.java index 119a3db8..46002af5 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesClientUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesClientUtils.java @@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.utils; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; import org.apache.flink.kubernetes.operator.metrics.KubernetesClientMetrics; import org.apache.flink.metrics.MetricGroup; @@ -28,10 +29,14 @@ import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientBuilder; import io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory; import okhttp3.OkHttpClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Kubernetes client utils. */ public class KubernetesClientUtils { + private static final Logger LOG = LoggerFactory.getLogger(KubernetesClientUtils.class); + public static KubernetesClient getKubernetesClient( FlinkOperatorConfiguration operatorConfig, MetricGroup metricGroup) { return getKubernetesClient(operatorConfig, metricGroup, null); @@ -63,4 +68,18 @@ public class KubernetesClientUtils { return clientBuilder.build(); } + + public static void replaceSpecAfterScaling( + KubernetesClient kubernetesClient, AbstractFlinkResource<?, ?> cr) { + var inKube = kubernetesClient.resource(cr).get(); + + if (cr.getMetadata().getGeneration() == inKube.getMetadata().getGeneration()) { + kubernetesClient + .resource(cr) + .lockResourceVersion(inKube.getMetadata().getResourceVersion()) + .replace(); + } else { + LOG.info("Spec already upgrading in kube, skipping scale operation."); + } + } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java index 9f654409..22efa1f0 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java @@ -286,7 +286,7 @@ public class TestingFlinkService extends AbstractFlinkService { } @Override - protected ClusterClient<String> getClusterClient(Configuration config) throws Exception { + public ClusterClient<String> getClusterClient(Configuration config) throws Exception { TestingClusterClient<String> clusterClient = new TestingClusterClient<>(config); FlinkVersion flinkVersion = config.get(FlinkConfigBuilder.FLINK_VERSION); clusterClient.setListJobsFunction( diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java index 19a924db..1a7151cc 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java @@ -17,6 +17,8 @@ package org.apache.flink.kubernetes.operator.controller; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.TestingFlinkService; import org.apache.flink.kubernetes.operator.TestingFlinkServiceFactory; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; @@ -82,7 +84,8 @@ public class TestingFlinkDeploymentController flinkServiceFactory, configManager, eventRecorder, - statusRecorder), + statusRecorder, + TestUtils.createTestMetricGroup(new Configuration())), new FlinkDeploymentObserverFactory( flinkServiceFactory, configManager, statusRecorder, eventRecorder), statusRecorder, diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java index 116e147f..e80374ee 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java @@ -17,6 +17,8 @@ package org.apache.flink.kubernetes.operator.controller; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.TestingFlinkService; import org.apache.flink.kubernetes.operator.TestingFlinkServiceFactory; import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; @@ -82,7 +84,8 @@ public class TestingFlinkSessionJobController flinkServiceFactory, configManager, eventRecorder, - statusRecorder), + statusRecorder, + TestUtils.createTestMetricGroup(new Configuration())), new FlinkSessionJobObserver( flinkServiceFactory, configManager, eventRecorder), statusRecorder, diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java index 9c426861..d9c057d7 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java @@ -79,7 +79,8 @@ public class FlinkSessionJobObserverTest { flinkServiceFactory, configManager, eventRecorder, - statusRecorder); + statusRecorder, + TestUtils.createTestMetricGroup(new Configuration())); } @Test diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java index 6c2f6ee2..a4f4acbc 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java @@ -112,7 +112,8 @@ public class ApplicationReconcilerTest { flinkService, configManager, eventRecorder, - statusRecorder); + statusRecorder, + TestUtils.createTestMetricGroup(new Configuration())); } @ParameterizedTest diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java index 2445741f..0e8549a0 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java @@ -80,7 +80,8 @@ public class ApplicationReconcilerUpgradeModeTest { flinkService, configManager, eventRecorder, - statusRecoder); + statusRecoder, + TestUtils.createTestMetricGroup(new Configuration())); } @ParameterizedTest diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java index f5ee07b7..1b0b6b07 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java @@ -80,7 +80,8 @@ public class SessionReconcilerTest { flinkService, configManager, eventRecorder, - new TestingStatusRecorder<>()); + new TestingStatusRecorder<>(), + TestUtils.createTestMetricGroup(new Configuration())); FlinkDeployment deployment = TestUtils.buildSessionCluster(); kubernetesClient.resource(deployment).createOrReplace(); reconciler.reconcile(deployment, flinkService.getContext()); @@ -96,7 +97,8 @@ public class SessionReconcilerTest { flinkService, configManager, eventRecorder, - new TestingStatusRecorder<>()); + new TestingStatusRecorder<>(), + TestUtils.createTestMetricGroup(new Configuration())); FlinkDeployment deployment = TestUtils.buildSessionCluster(); kubernetesClient.resource(deployment).createOrReplace(); @@ -145,7 +147,8 @@ public class SessionReconcilerTest { flinkService, configManager, eventRecorder, - new TestingStatusRecorder<>()); + new TestingStatusRecorder<>(), + TestUtils.createTestMetricGroup(new Configuration())); FlinkDeployment flinkApp = TestUtils.buildApplicationCluster(); ObjectMeta deployMeta = flinkApp.getMetadata(); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java index b9309c9f..f6d8e81f 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java @@ -107,14 +107,15 @@ public class SessionJobReconcilerTest { } }; statusRecoder = new TestingStatusRecorder<>(); + kubernetesClient.resource(TestUtils.buildSessionJob()).createOrReplace(); reconciler = new SessionJobReconciler( kubernetesClient, flinkServiceFactory, configManager, eventRecorder, - statusRecoder); - kubernetesClient.resource(TestUtils.buildSessionJob()).createOrReplace(); + statusRecoder, + TestUtils.createTestMetricGroup(new Configuration())); } @Test @@ -610,7 +611,12 @@ public class SessionJobReconcilerTest { // Force upgrade when savepoint is in progress. reconciler = new SessionJobReconciler( - null, flinkServiceFactory, configManager, eventRecorder, statusRecoder); + null, + flinkServiceFactory, + configManager, + eventRecorder, + statusRecoder, + TestUtils.createTestMetricGroup(new Configuration())); spSessionJob.getSpec().getJob().setParallelism(100); reconciler.reconcile(spSessionJob, readyContext); assertEquals( diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java index 234a3e33..a0ba485c 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java @@ -502,7 +502,7 @@ public class NativeFlinkServiceTest { private FlinkService createFlinkService(ClusterClient<String> clusterClient) { return new NativeFlinkService(client, new FlinkConfigManager(configuration)) { @Override - protected ClusterClient<String> getClusterClient(Configuration config) { + public ClusterClient<String> getClusterClient(Configuration config) { return clusterClient; } }; diff --git a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml index ef023346..5212526f 100644 --- a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml +++ b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml @@ -40,6 +40,7 @@ spec: - v1_14 - v1_15 - v1_16 + - v1_17 type: string ingress: properties: diff --git a/pom.xml b/pom.xml index d1de2e4d..f3576a7b 100644 --- a/pom.xml +++ b/pom.xml @@ -58,6 +58,7 @@ under the License. <module>flink-kubernetes-docs</module> <module>examples/flink-sql-runner-example</module> <module>examples/kubernetes-client-examples</module> + <module>examples/autoscaling</module> </modules> <properties>
