This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 1dd9850  [FLINK-27685] Add scale subresource and basic HPA support
1dd9850 is described below

commit 1dd9850fb37d4f7d572f46b247470bda13fb3660
Author: Gyula Fora <[email protected]>
AuthorDate: Fri Jun 10 12:19:26 2022 +0200

    [FLINK-27685] Add scale subresource and basic HPA support
---
 docs/content/docs/custom-resource/reference.md     | 12 +++++++
 examples/hpa/basic-hpa.yaml                        | 37 ++++++++++++++++++++++
 .../operator/config/FlinkConfigBuilder.java        | 17 +++++++---
 .../operator/crd/spec/TaskManagerSpec.java         |  4 +++
 .../operator/crd/status/FlinkDeploymentStatus.java |  3 ++
 .../TaskManagerInfo.java}                          | 19 ++++++-----
 .../operator/reconciler/ReconciliationUtils.java   | 29 ++++++++++++++---
 .../deployment/AbstractDeploymentReconciler.java   |  6 ++--
 .../deployment/ApplicationReconciler.java          |  8 +++--
 .../reconciler/deployment/SessionReconciler.java   | 13 ++++----
 .../sessionjob/FlinkSessionJobReconciler.java      |  4 +--
 .../kubernetes/operator/utils/FlinkUtils.java      |  8 +++++
 .../operator/validation/DefaultValidator.java      | 25 +++++++++++----
 .../flink/kubernetes/operator/TestUtils.java       |  2 +-
 .../operator/config/FlinkConfigBuilderTest.java    | 15 ++++++++-
 .../operator/config/FlinkConfigManagerTest.java    |  3 +-
 .../controller/FlinkDeploymentControllerTest.java  |  6 ++++
 .../operator/service/FlinkServiceTest.java         | 12 ++++---
 .../kubernetes/operator/utils/FlinkUtilsTest.java  | 19 +++++++++++
 .../operator/utils/ReconciliationUtilsTest.java    |  3 +-
 .../operator/validation/DefaultValidatorTest.java  | 17 ++++++++--
 .../crds/flinkdeployments.flink.apache.org-v1.yml  | 13 ++++++++
 22 files changed, 228 insertions(+), 47 deletions(-)

diff --git a/docs/content/docs/custom-resource/reference.md 
b/docs/content/docs/custom-resource/reference.md
index 7f44e83..100db9d 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -149,6 +149,7 @@ This page serves as a full reference for FlinkDeployment 
custom resource definit
 | Parameter | Type | Docs |
 | ----------| ---- | ---- |
 | resource | org.apache.flink.kubernetes.operator.crd.spec.Resource | Resource 
specification for the TaskManager pods. |
+| replicas | java.lang.Integer | Number of TaskManager replicas. If defined, 
takes precedence over parallelism |
 | podTemplate | io.fabric8.kubernetes.api.model.Pod | TaskManager pod 
template. It will be merged with FlinkDeploymentSpec.podTemplate. |
 
 ### UpgradeMode
@@ -188,6 +189,7 @@ This page serves as a full reference for FlinkDeployment 
custom resource definit
 | clusterInfo | java.util.Map<java.lang.String,java.lang.String> | Config 
information from running clusters. |
 | jobManagerDeploymentStatus | 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus | 
Last observed status of the JobManager deployment. |
 | reconciliationStatus | 
org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentReconciliationStatus
 | Status of the last reconcile operation. |
+| taskManager | 
org.apache.flink.kubernetes.operator.crd.status.TaskManagerInfo | Information 
about the TaskManagers for the scale subresource. |
 
 ### FlinkSessionJobReconciliationStatus
 **Class**: 
org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobReconciliationStatus
@@ -287,3 +289,13 @@ This page serves as a full reference for FlinkDeployment 
custom resource definit
 | PERIODIC | Savepoint periodically triggered by the operator. |
 | UPGRADE | Savepoint triggered during stateful upgrade. |
 | UNKNOWN | Savepoint trigger mechanism unknown, such as savepoint retrieved 
directly from Flink job. |
+
+### TaskManagerInfo
+**Class**: org.apache.flink.kubernetes.operator.crd.status.TaskManagerInfo
+
+**Description**: Last observed status of the Flink job within an application 
deployment.
+
+| Parameter | Type | Docs |
+| ----------| ---- | ---- |
+| labelSelector | java.lang.String | TaskManager label selector. |
+| replicas | int | Number of TaskManager replicas if defined in the spec. |
diff --git a/examples/hpa/basic-hpa.yaml b/examples/hpa/basic-hpa.yaml
new file mode 100644
index 0000000..e35be9c
--- /dev/null
+++ b/examples/hpa/basic-hpa.yaml
@@ -0,0 +1,37 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: autoscaling/v2
+kind: HorizontalPodAutoscaler
+metadata:
+  name: basic-hpa
+  namespace: default
+spec:
+  minReplicas: 1
+  maxReplicas: 3
+  metrics:
+  - type: Resource
+    resource:
+      name: memory
+      target:
+        type: Utilization
+        averageValue: 10Mi
+  scaleTargetRef:
+    apiVersion: flink.apache.org/v1beta1
+    kind: FlinkDeployment
+    name: basic
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
index 939dee1..2dfd9fd 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
@@ -226,11 +226,8 @@ public class FlinkConfigBuilder {
                     DeploymentOptions.TARGET, 
KubernetesDeploymentTarget.APPLICATION.getName());
             final URI uri = new URI(spec.getJob().getJarURI());
             effectiveConfig.set(PipelineOptions.JARS, 
Collections.singletonList(uri.toString()));
+            effectiveConfig.set(CoreOptions.DEFAULT_PARALLELISM, 
getParallelism());
 
-            if (spec.getJob().getParallelism() > 0) {
-                effectiveConfig.set(
-                        CoreOptions.DEFAULT_PARALLELISM, 
spec.getJob().getParallelism());
-            }
             if (spec.getJob().getAllowNonRestoredState() != null) {
                 effectiveConfig.set(
                         
SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
@@ -243,6 +240,18 @@ public class FlinkConfigBuilder {
         return this;
     }
 
+    private int getParallelism() {
+        if (spec.getTaskManager() != null && 
spec.getTaskManager().getReplicas() != null) {
+            if (spec.getJob().getParallelism() > 0) {
+                LOG.warn("Job parallelism setting is ignored as TaskManager 
replicas are set");
+            }
+            return spec.getTaskManager().getReplicas()
+                    * effectiveConfig.get(TaskManagerOptions.NUM_TASK_SLOTS);
+        }
+
+        return spec.getJob().getParallelism();
+    }
+
     protected Configuration build() {
 
         // Set cluster config
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
index 115791b..400a2cc 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.operator.crd.spec;
 import org.apache.flink.annotation.Experimental;
 
 import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.model.annotation.SpecReplicas;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
@@ -33,6 +34,9 @@ public class TaskManagerSpec {
     /** Resource specification for the TaskManager pods. */
     private Resource resource;
 
+    /** Number of TaskManager replicas. If defined, takes precedence over 
parallelism */
+    @SpecReplicas private Integer replicas;
+
     /** TaskManager pod template. It will be merged with 
FlinkDeploymentSpec.podTemplate. */
     private Pod podTemplate;
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
index fd61d0c..c2f6d0d 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
@@ -50,4 +50,7 @@ public class FlinkDeploymentStatus extends 
CommonStatus<FlinkDeploymentSpec> {
     /** Status of the last reconcile operation. */
     private FlinkDeploymentReconciliationStatus reconciliationStatus =
             new FlinkDeploymentReconciliationStatus();
+
+    /** Information about the TaskManagers for the scale subresource. */
+    private TaskManagerInfo taskManager;
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/TaskManagerInfo.java
similarity index 65%
copy from 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
copy to 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/TaskManagerInfo.java
index 115791b..1e5ef43 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/TaskManagerInfo.java
@@ -15,24 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.crd.spec;
+package org.apache.flink.kubernetes.operator.crd.status;
 
 import org.apache.flink.annotation.Experimental;
 
-import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.model.annotation.LabelSelector;
+import io.fabric8.kubernetes.model.annotation.StatusReplicas;
 import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
-/** TaskManager spec. */
+/** Last observed status of the Flink job within an application deployment. */
 @Experimental
 @Data
 @NoArgsConstructor
 @AllArgsConstructor
-public class TaskManagerSpec {
-    /** Resource specification for the TaskManager pods. */
-    private Resource resource;
+@Builder(toBuilder = true)
+public class TaskManagerInfo {
+    /** TaskManager label selector. */
+    @LabelSelector private String labelSelector;
 
-    /** TaskManager pod template. It will be merged with 
FlinkDeploymentSpec.podTemplate. */
-    private Pod podTemplate;
+    /** Number of TaskManager replicas if defined in the spec. */
+    @StatusReplicas private int replicas = 0;
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index cd28a57..bd2c5bb 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
+import org.apache.flink.kubernetes.operator.crd.status.TaskManagerInfo;
 import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
 import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
@@ -67,12 +68,22 @@ public class ReconciliationUtils {
     private static final ObjectMapper objectMapper = new ObjectMapper();
 
     public static <SPEC extends AbstractFlinkSpec> void 
updateForSpecReconciliationSuccess(
-            AbstractFlinkResource<SPEC, ?> target, JobState 
stateAfterReconcile) {
-        var commonStatus = target.getStatus();
+            AbstractFlinkResource<SPEC, ?> target,
+            JobState stateAfterReconcile,
+            Configuration conf) {
+        var status = target.getStatus();
         var spec = target.getSpec();
 
-        ReconciliationStatus<SPEC> reconciliationStatus = 
commonStatus.getReconciliationStatus();
-        commonStatus.setError(null);
+        ReconciliationStatus<SPEC> reconciliationStatus = 
status.getReconciliationStatus();
+        status.setError(null);
+
+        // For application deployments we update the taskmanager info
+        if (target instanceof FlinkDeployment && spec.getJob() != null) {
+            ((FlinkDeploymentStatus) status)
+                    .setTaskManager(
+                            getTaskManagerInfo(
+                                    target.getMetadata().getName(), conf, 
stateAfterReconcile));
+        }
 
         var clonedSpec = ReconciliationUtils.clone(spec);
         AbstractFlinkSpec lastReconciledSpec = 
reconciliationStatus.deserializeLastReconciledSpec();
@@ -116,6 +127,16 @@ public class ReconciliationUtils {
         
reconciliationStatus.setReconciliationTimestamp(System.currentTimeMillis());
     }
 
+    private static TaskManagerInfo getTaskManagerInfo(
+            String name, Configuration conf, JobState jobState) {
+        var labelSelector = "component=taskmanager,app=" + name;
+        if (jobState == JobState.RUNNING) {
+            return new TaskManagerInfo(labelSelector, 
FlinkUtils.getNumTaskManagers(conf));
+        } else {
+            return new TaskManagerInfo("", 0);
+        }
+    }
+
     public static void updateForReconciliationError(
             AbstractFlinkResource<?, ?> target, String error) {
         target.getStatus().setError(error);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractDeploymentReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractDeploymentReconciler.java
index 51786f5..6c77657 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractDeploymentReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractDeploymentReconciler.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.reconciler.deployment;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
@@ -71,14 +72,15 @@ public abstract class AbstractDeploymentReconciler 
implements Reconciler<FlinkDe
         return false;
     }
 
-    protected boolean newSpecIsAlreadyDeployed(FlinkDeployment flinkApp) {
+    protected boolean newSpecIsAlreadyDeployed(FlinkDeployment flinkApp, 
Configuration deployConf) {
         FlinkDeploymentSpec deployedSpec = 
ReconciliationUtils.getDeployedSpec(flinkApp);
         if (flinkApp.getSpec().equals(deployedSpec)) {
             LOG.info(
                     "The new spec matches the currently deployed last stable 
spec. No upgrade needed.");
             ReconciliationUtils.updateForSpecReconciliationSuccess(
                     flinkApp,
-                    deployedSpec.getJob() != null ? 
deployedSpec.getJob().getState() : null);
+                    deployedSpec.getJob() != null ? 
deployedSpec.getJob().getState() : null,
+                    deployConf);
             return true;
         }
         return false;
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 5c8030b..01bfe69 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -89,7 +89,8 @@ public class ApplicationReconciler extends 
AbstractDeploymentReconciler {
                     false);
             IngressUtils.updateIngressRules(
                     deployMeta, currentDeploySpec, deployConfig, 
kubernetesClient);
-            ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp, 
JobState.RUNNING);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(
+                    flinkApp, JobState.RUNNING, deployConfig);
             return;
         }
 
@@ -103,7 +104,7 @@ public class ApplicationReconciler extends 
AbstractDeploymentReconciler {
         Configuration observeConfig = configManager.getObserveConfig(flinkApp);
         boolean specChanged = !currentDeploySpec.equals(lastReconciledSpec);
         if (specChanged) {
-            if (newSpecIsAlreadyDeployed(flinkApp)) {
+            if (newSpecIsAlreadyDeployed(flinkApp, deployConfig)) {
                 return;
             }
             LOG.debug("Detected spec change, starting upgrade process.");
@@ -134,7 +135,8 @@ public class ApplicationReconciler extends 
AbstractDeploymentReconciler {
                         lastReconciledSpec.getJob().getUpgradeMode() == 
UpgradeMode.LAST_STATE);
                 stateAfterReconcile = JobState.RUNNING;
             }
-            ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp, 
stateAfterReconcile);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(
+                    flinkApp, stateAfterReconcile, deployConfig);
             IngressUtils.updateIngressRules(
                     deployMeta, currentDeploySpec, deployConfig, 
kubernetesClient);
         } else if (ReconciliationUtils.shouldRollBack(
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index 18045bf..6057178 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -72,22 +72,21 @@ public class SessionReconciler extends 
AbstractDeploymentReconciler {
             
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
             IngressUtils.updateIngressRules(
                     flinkApp.getMetadata(), currentDeploySpec, conf, 
kubernetesClient);
-            ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp, 
null);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp, 
null, conf);
             return;
         }
 
         Configuration observeConfig = configManager.getObserveConfig(flinkApp);
         boolean specChanged = !currentDeploySpec.equals(lastReconciledSpec);
         if (specChanged) {
-            if (newSpecIsAlreadyDeployed(flinkApp)) {
+            var deployConf =
+                    configManager.getDeployConfig(flinkApp.getMetadata(), 
currentDeploySpec);
+            if (newSpecIsAlreadyDeployed(flinkApp, deployConf)) {
                 return;
             }
             LOG.debug("Detected spec change, starting upgrade process.");
-            upgradeSessionCluster(
-                    flinkApp,
-                    currentDeploySpec,
-                    configManager.getDeployConfig(flinkApp.getMetadata(), 
currentDeploySpec));
-            ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp, 
null);
+            upgradeSessionCluster(flinkApp, currentDeploySpec, deployConf);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp, 
null, deployConf);
         } else if (ReconciliationUtils.shouldRollBack(
                 flinkService, reconciliationStatus, observeConfig)) {
             rollbackSessionCluster(flinkApp);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java
index c46f3e3..0079d80 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java
@@ -92,7 +92,7 @@ public class FlinkSessionJobReconciler implements 
Reconciler<FlinkSessionJob> {
                                     
flinkSessionJob.getSpec().getJob().getInitialSavepointPath())
                             .orElse(null));
             ReconciliationUtils.updateForSpecReconciliationSuccess(
-                    flinkSessionJob, JobState.RUNNING);
+                    flinkSessionJob, JobState.RUNNING, sessionJobConfig);
             return;
         }
 
@@ -127,7 +127,7 @@ public class FlinkSessionJobReconciler implements 
Reconciler<FlinkSessionJob> {
                 stateAfterReconcile = JobState.RUNNING;
             }
             ReconciliationUtils.updateForSpecReconciliationSuccess(
-                    flinkSessionJob, stateAfterReconcile);
+                    flinkSessionJob, stateAfterReconcile, sessionJobConfig);
         } else {
             if (!SavepointUtils.triggerSavepointIfNeeded(
                     flinkService, flinkSessionJob, sessionJobConfig)) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index 3896f05..f672d65 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -19,7 +19,9 @@ package org.apache.flink.kubernetes.operator.utils;
 
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
 import 
org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
@@ -278,4 +280,10 @@ public class FlinkUtils {
     public static boolean clusterShutdownDisabled(FlinkDeploymentSpec spec) {
         return spec.getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_14);
     }
+
+    public static int getNumTaskManagers(Configuration conf) {
+        int parallelism = conf.get(CoreOptions.DEFAULT_PARALLELISM);
+        int taskSlots = conf.get(TaskManagerOptions.NUM_TASK_SLOTS);
+        return (parallelism + taskSlots - 1) / taskSlots;
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index 54611fe..0c28ff8 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -43,6 +43,8 @@ import 
org.apache.flink.kubernetes.operator.utils.IngressUtils;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.util.StringUtils;
 
+import javax.annotation.Nullable;
+
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -82,7 +84,7 @@ public class DefaultValidator implements 
FlinkResourceValidator {
                         deployment.getMetadata().getName(),
                         deployment.getMetadata().getNamespace()),
                 validateLogConfig(spec.getLogConfiguration()),
-                validateJobSpec(spec.getJob(), effectiveConfig),
+                validateJobSpec(spec.getJob(), spec.getTaskManager(), 
effectiveConfig),
                 validateJmSpec(spec.getJobManager(), effectiveConfig),
                 validateTmSpec(spec.getTaskManager()),
                 validateSpecChange(deployment, effectiveConfig),
@@ -155,15 +157,12 @@ public class DefaultValidator implements 
FlinkResourceValidator {
         return Optional.empty();
     }
 
-    private Optional<String> validateJobSpec(JobSpec job, Map<String, String> 
confMap) {
+    private Optional<String> validateJobSpec(
+            JobSpec job, @Nullable TaskManagerSpec tm, Map<String, String> 
confMap) {
         if (job == null) {
             return Optional.empty();
         }
 
-        if (job.getParallelism() < 1) {
-            return Optional.of("Job parallelism must be larger than 0");
-        }
-
         if (job.getJarURI() == null) {
             return Optional.of("Jar URI must be defined");
         }
@@ -206,6 +205,14 @@ public class DefaultValidator implements 
FlinkResourceValidator {
             }
         }
 
+        var tmReplicasDefined = tm != null && tm.getReplicas() != null;
+
+        if (tmReplicasDefined && tm.getReplicas() < 1) {
+            return Optional.of("TaskManager replicas must be larger than 0");
+        } else if (!tmReplicasDefined && job.getParallelism() < 1) {
+            return Optional.of("Job parallelism must be larger than 0");
+        }
+
         return Optional.empty();
     }
 
@@ -235,6 +242,10 @@ public class DefaultValidator implements 
FlinkResourceValidator {
             return Optional.empty();
         }
 
+        if (tmSpec.getReplicas() != null && tmSpec.getReplicas() < 1) {
+            return Optional.of("TaskManager replicas should not be configured 
less than one.");
+        }
+
         return validateResources("TaskManager", tmSpec.getResource());
     }
 
@@ -336,7 +347,7 @@ public class DefaultValidator implements 
FlinkResourceValidator {
         return firstPresent(
                 validateNotApplicationCluster(sessionCluster),
                 validateSessionClusterId(sessionJob, sessionCluster),
-                validateJobSpec(sessionJob.getSpec().getJob(), 
effectiveConfig));
+                validateJobSpec(sessionJob.getSpec().getJob(), null, 
effectiveConfig));
     }
 
     private Optional<String> validateJobNotEmpty(FlinkSessionJob sessionJob) {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 306eb3a..2ad7c1e 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -167,7 +167,7 @@ public class TestUtils {
                 .flinkVersion(version)
                 .flinkConfiguration(conf)
                 .jobManager(new JobManagerSpec(new Resource(1.0, "2048m"), 1, 
null))
-                .taskManager(new TaskManagerSpec(new Resource(1.0, "2048m"), 
null))
+                .taskManager(new TaskManagerSpec(new Resource(1.0, "2048m"), 
null, null))
                 .build();
     }
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
index 2cc456c..df5bf41 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.utils.Constants;
@@ -293,7 +294,7 @@ public class FlinkConfigBuilderTest {
     @Test
     public void testApplyJobOrSessionSpec() throws Exception {
         flinkDeployment.getSpec().getJob().setAllowNonRestoredState(true);
-        final Configuration configuration =
+        var configuration =
                 new FlinkConfigBuilder(flinkDeployment, new Configuration())
                         .applyJobOrSessionSpec()
                         .build();
@@ -305,6 +306,18 @@ public class FlinkConfigBuilderTest {
         Assertions.assertEquals(SAMPLE_JAR, 
configuration.get(PipelineOptions.JARS).get(0));
         Assertions.assertEquals(
                 Integer.valueOf(2), 
configuration.get(CoreOptions.DEFAULT_PARALLELISM));
+
+        var dep = ReconciliationUtils.clone(flinkDeployment);
+        dep.getSpec().setTaskManager(new TaskManagerSpec());
+        dep.getSpec().getTaskManager().setReplicas(3);
+        
dep.getSpec().getFlinkConfiguration().put(TaskManagerOptions.NUM_TASK_SLOTS.key(),
 "4");
+        configuration =
+                new FlinkConfigBuilder(dep, new Configuration())
+                        .applyFlinkConfiguration()
+                        .applyJobOrSessionSpec()
+                        .build();
+
+        Assertions.assertEquals(12, 
configuration.get(CoreOptions.DEFAULT_PARALLELISM));
     }
 
     @Test
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
index 70670dc..5be498b 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
@@ -102,7 +102,8 @@ public class FlinkConfigManagerTest {
         
deployment.getSpec().setLogConfiguration(Map.of(Constants.CONFIG_FILE_LOG4J_NAME,
 "test"));
         deployment.getSpec().setPodTemplate(new Pod());
 
-        ReconciliationUtils.updateForSpecReconciliationSuccess(deployment, 
JobState.RUNNING);
+        ReconciliationUtils.updateForSpecReconciliationSuccess(
+                deployment, JobState.RUNNING, config);
         Configuration deployConfig = 
configManager.getObserveConfig(deployment);
         assertFalse(
                 deployConfig.contains(
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 7b0c0c7..ffae61f 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import org.apache.flink.kubernetes.operator.crd.status.TaskManagerInfo;
 import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
@@ -287,6 +288,10 @@ public class FlinkDeploymentControllerTest {
         List<Tuple2<String, JobStatusMessage>> jobs = flinkService.listJobs();
         assertEquals(1, jobs.size());
         assertEquals("s0", jobs.get(0).f0);
+        assertEquals(
+                new TaskManagerInfo(
+                        "component=taskmanager,app=" + 
appCluster.getMetadata().getName(), 1),
+                appCluster.getStatus().getTaskManager());
 
         List<Tuple2<String, JobStatusMessage>> previousJobs = new 
ArrayList<>(jobs);
         appCluster.getSpec().getJob().setInitialSavepointPath("s1");
@@ -322,6 +327,7 @@ public class FlinkDeploymentControllerTest {
                         .getSavepointInfo()
                         .getSavepointHistory()
                         .size());
+        assertEquals(new TaskManagerInfo("", 0), 
appCluster.getStatus().getTaskManager());
 
         testController.reconcile(appCluster, context);
         jobs = flinkService.listJobs();
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
index c5c823d..f5afb2b 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
@@ -93,7 +93,8 @@ public class FlinkServiceTest {
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
         JobStatus jobStatus = deployment.getStatus().getJobStatus();
         jobStatus.setJobId(jobID.toHexString());
-        ReconciliationUtils.updateForSpecReconciliationSuccess(deployment, 
JobState.RUNNING);
+        ReconciliationUtils.updateForSpecReconciliationSuccess(
+                deployment, JobState.RUNNING, new Configuration());
 
         
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
         deployment.getStatus().getJobStatus().setState("RUNNING");
@@ -130,7 +131,8 @@ public class FlinkServiceTest {
         JobStatus jobStatus = deployment.getStatus().getJobStatus();
         jobStatus.setJobId(jobID.toHexString());
         
jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
-        ReconciliationUtils.updateForSpecReconciliationSuccess(deployment, 
JobState.RUNNING);
+        ReconciliationUtils.updateForSpecReconciliationSuccess(
+                deployment, JobState.RUNNING, new Configuration());
 
         flinkService.cancelJob(deployment, UpgradeMode.SAVEPOINT);
         assertTrue(stopWithSavepointFuture.isDone());
@@ -143,7 +145,8 @@ public class FlinkServiceTest {
     @Test
     public void testCancelJobWithLastStateUpgradeMode() throws Exception {
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
-        ReconciliationUtils.updateForSpecReconciliationSuccess(deployment, 
JobState.RUNNING);
+        ReconciliationUtils.updateForSpecReconciliationSuccess(
+                deployment, JobState.RUNNING, new Configuration());
         final TestingClusterClient<String> testingClusterClient =
                 new TestingClusterClient<>(configuration, 
TestUtils.TEST_DEPLOYMENT_NAME);
         final FlinkService flinkService = 
createFlinkService(testingClusterClient);
@@ -198,7 +201,8 @@ public class FlinkServiceTest {
 
         final JobID jobID = JobID.generate();
         final FlinkDeployment flinkDeployment = 
TestUtils.buildApplicationCluster();
-        
ReconciliationUtils.updateForSpecReconciliationSuccess(flinkDeployment, 
JobState.RUNNING);
+        ReconciliationUtils.updateForSpecReconciliationSuccess(
+                flinkDeployment, JobState.RUNNING, new Configuration());
         JobStatus jobStatus = new JobStatus();
         jobStatus.setJobId(jobID.toString());
         flinkDeployment.getStatus().setJobStatus(jobStatus);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
index 09a8968..dae68ff 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
@@ -19,6 +19,9 @@
 package org.apache.flink.kubernetes.operator.utils;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
@@ -106,6 +109,22 @@ public class FlinkUtilsTest {
                 clusterId, kubernetesClient.getNamespace(), kubernetesClient);
     }
 
+    @Test
+    public void testComputeNumTms() {
+        Configuration conf = new Configuration();
+        conf.set(CoreOptions.DEFAULT_PARALLELISM, 2);
+        conf.set(TaskManagerOptions.NUM_TASK_SLOTS, 1);
+
+        assertEquals(2, FlinkUtils.getNumTaskManagers(conf));
+
+        conf.set(TaskManagerOptions.NUM_TASK_SLOTS, 3);
+        assertEquals(1, FlinkUtils.getNumTaskManagers(conf));
+
+        conf.set(CoreOptions.DEFAULT_PARALLELISM, 7);
+        conf.set(TaskManagerOptions.NUM_TASK_SLOTS, 2);
+        assertEquals(4, FlinkUtils.getNumTaskManagers(conf));
+    }
+
     private void createHAConfigMapWithData(
             String configMapName, String clusterId, Map<String, String> data) {
         final ConfigMap kubernetesConfigMap =
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
index 289b498..7e1f21e 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
@@ -54,7 +54,8 @@ public class ReconciliationUtilsTest {
         current.getStatus()
                 .getReconciliationStatus()
                 
.serializeAndSetLastReconciledSpec(ReconciliationUtils.clone(current.getSpec()));
-        ReconciliationUtils.updateForSpecReconciliationSuccess(current, 
JobState.SUSPENDED);
+        ReconciliationUtils.updateForSpecReconciliationSuccess(
+                current, JobState.SUSPENDED, new Configuration());
 
         UpdateControl<FlinkDeployment> updateControl =
                 ReconciliationUtils.toUpdateControl(operatorConfiguration, 
current, previous, true);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index 24af718..235bb1b 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import 
org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentReconciliationStatus;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
@@ -76,9 +77,21 @@ public class DefaultValidatorTest {
         testError(
                 dep -> dep.getSpec().getJob().setParallelism(0),
                 "Job parallelism must be larger than 0");
+
         testError(
-                dep -> dep.getSpec().getJob().setParallelism(-1),
-                "Job parallelism must be larger than 0");
+                dep -> {
+                    var tmSpec = new TaskManagerSpec();
+                    tmSpec.setReplicas(0);
+                    dep.getSpec().setTaskManager(tmSpec);
+                },
+                "TaskManager replicas must be larger than 0");
+
+        testSuccess(
+                dep -> {
+                    dep.getSpec().getTaskManager().setReplicas(1);
+                    dep.getSpec().getJob().setParallelism(0);
+                });
+
         testError(
                 dep -> {
                     dep.getSpec().setFlinkConfiguration(new HashMap<>());
diff --git 
a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml 
b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index 6246a36..83f3b9e 100644
--- 
a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ 
b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -6048,6 +6048,8 @@ spec:
                       memory:
                         type: string
                     type: object
+                  replicas:
+                    type: integer
                   podTemplate:
                     properties:
                       apiVersion:
@@ -9112,6 +9114,13 @@ spec:
                     - ROLLED_BACK
                     type: string
                 type: object
+              taskManager:
+                properties:
+                  labelSelector:
+                    type: string
+                  replicas:
+                    type: integer
+                type: object
               jobStatus:
                 properties:
                   jobName:
@@ -9178,4 +9187,8 @@ spec:
     served: true
     storage: true
     subresources:
+      scale:
+        labelSelectorPath: .status.taskManager.labelSelector
+        specReplicasPath: .spec.taskManager.replicas
+        statusReplicasPath: .status.taskManager.replicas
       status: {}

Reply via email to