This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 1dd9850 [FLINK-27685] Add scale subresource and basic HPA support
1dd9850 is described below
commit 1dd9850fb37d4f7d572f46b247470bda13fb3660
Author: Gyula Fora <[email protected]>
AuthorDate: Fri Jun 10 12:19:26 2022 +0200
[FLINK-27685] Add scale subresource and basic HPA support
---
docs/content/docs/custom-resource/reference.md | 12 +++++++
examples/hpa/basic-hpa.yaml | 37 ++++++++++++++++++++++
.../operator/config/FlinkConfigBuilder.java | 17 +++++++---
.../operator/crd/spec/TaskManagerSpec.java | 4 +++
.../operator/crd/status/FlinkDeploymentStatus.java | 3 ++
.../TaskManagerInfo.java} | 19 ++++++-----
.../operator/reconciler/ReconciliationUtils.java | 29 ++++++++++++++---
.../deployment/AbstractDeploymentReconciler.java | 6 ++--
.../deployment/ApplicationReconciler.java | 8 +++--
.../reconciler/deployment/SessionReconciler.java | 13 ++++----
.../sessionjob/FlinkSessionJobReconciler.java | 4 +--
.../kubernetes/operator/utils/FlinkUtils.java | 8 +++++
.../operator/validation/DefaultValidator.java | 25 +++++++++++----
.../flink/kubernetes/operator/TestUtils.java | 2 +-
.../operator/config/FlinkConfigBuilderTest.java | 15 ++++++++-
.../operator/config/FlinkConfigManagerTest.java | 3 +-
.../controller/FlinkDeploymentControllerTest.java | 6 ++++
.../operator/service/FlinkServiceTest.java | 12 ++++---
.../kubernetes/operator/utils/FlinkUtilsTest.java | 19 +++++++++++
.../operator/utils/ReconciliationUtilsTest.java | 3 +-
.../operator/validation/DefaultValidatorTest.java | 17 ++++++++--
.../crds/flinkdeployments.flink.apache.org-v1.yml | 13 ++++++++
22 files changed, 228 insertions(+), 47 deletions(-)
diff --git a/docs/content/docs/custom-resource/reference.md
b/docs/content/docs/custom-resource/reference.md
index 7f44e83..100db9d 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -149,6 +149,7 @@ This page serves as a full reference for FlinkDeployment
custom resource definit
| Parameter | Type | Docs |
| ----------| ---- | ---- |
| resource | org.apache.flink.kubernetes.operator.crd.spec.Resource | Resource
specification for the TaskManager pods. |
+| replicas | java.lang.Integer | Number of TaskManager replicas. If defined,
takes precedence over parallelism |
| podTemplate | io.fabric8.kubernetes.api.model.Pod | TaskManager pod
template. It will be merged with FlinkDeploymentSpec.podTemplate. |
### UpgradeMode
@@ -188,6 +189,7 @@ This page serves as a full reference for FlinkDeployment
custom resource definit
| clusterInfo | java.util.Map<java.lang.String,java.lang.String> | Config
information from running clusters. |
| jobManagerDeploymentStatus |
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus |
Last observed status of the JobManager deployment. |
| reconciliationStatus |
org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentReconciliationStatus
| Status of the last reconcile operation. |
+| taskManager |
org.apache.flink.kubernetes.operator.crd.status.TaskManagerInfo | Information
about the TaskManagers for the scale subresource. |
### FlinkSessionJobReconciliationStatus
**Class**:
org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobReconciliationStatus
@@ -287,3 +289,13 @@ This page serves as a full reference for FlinkDeployment
custom resource definit
| PERIODIC | Savepoint periodically triggered by the operator. |
| UPGRADE | Savepoint triggered during stateful upgrade. |
| UNKNOWN | Savepoint trigger mechanism unknown, such as savepoint retrieved
directly from Flink job. |
+
+### TaskManagerInfo
+**Class**: org.apache.flink.kubernetes.operator.crd.status.TaskManagerInfo
+
+**Description**: Last observed status of the Flink job within an application
deployment.
+
+| Parameter | Type | Docs |
+| ----------| ---- | ---- |
+| labelSelector | java.lang.String | TaskManager label selector. |
+| replicas | int | Number of TaskManager replicas if defined in the spec. |
diff --git a/examples/hpa/basic-hpa.yaml b/examples/hpa/basic-hpa.yaml
new file mode 100644
index 0000000..e35be9c
--- /dev/null
+++ b/examples/hpa/basic-hpa.yaml
@@ -0,0 +1,37 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: autoscaling/v2
+kind: HorizontalPodAutoscaler
+metadata:
+ name: basic-hpa
+ namespace: default
+spec:
+ minReplicas: 1
+ maxReplicas: 3
+ metrics:
+ - type: Resource
+ resource:
+ name: memory
+ target:
+ type: Utilization
+ averageValue: 10Mi
+ scaleTargetRef:
+ apiVersion: flink.apache.org/v1beta1
+ kind: FlinkDeployment
+ name: basic
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
index 939dee1..2dfd9fd 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
@@ -226,11 +226,8 @@ public class FlinkConfigBuilder {
DeploymentOptions.TARGET,
KubernetesDeploymentTarget.APPLICATION.getName());
final URI uri = new URI(spec.getJob().getJarURI());
effectiveConfig.set(PipelineOptions.JARS,
Collections.singletonList(uri.toString()));
+ effectiveConfig.set(CoreOptions.DEFAULT_PARALLELISM,
getParallelism());
- if (spec.getJob().getParallelism() > 0) {
- effectiveConfig.set(
- CoreOptions.DEFAULT_PARALLELISM,
spec.getJob().getParallelism());
- }
if (spec.getJob().getAllowNonRestoredState() != null) {
effectiveConfig.set(
SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
@@ -243,6 +240,18 @@ public class FlinkConfigBuilder {
return this;
}
+ private int getParallelism() {
+ if (spec.getTaskManager() != null &&
spec.getTaskManager().getReplicas() != null) {
+ if (spec.getJob().getParallelism() > 0) {
+ LOG.warn("Job parallelism setting is ignored as TaskManager
replicas are set");
+ }
+ return spec.getTaskManager().getReplicas()
+ * effectiveConfig.get(TaskManagerOptions.NUM_TASK_SLOTS);
+ }
+
+ return spec.getJob().getParallelism();
+ }
+
protected Configuration build() {
// Set cluster config
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
index 115791b..400a2cc 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.operator.crd.spec;
import org.apache.flink.annotation.Experimental;
import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.model.annotation.SpecReplicas;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -33,6 +34,9 @@ public class TaskManagerSpec {
/** Resource specification for the TaskManager pods. */
private Resource resource;
+ /** Number of TaskManager replicas. If defined, takes precedence over
parallelism */
+ @SpecReplicas private Integer replicas;
+
/** TaskManager pod template. It will be merged with
FlinkDeploymentSpec.podTemplate. */
private Pod podTemplate;
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
index fd61d0c..c2f6d0d 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
@@ -50,4 +50,7 @@ public class FlinkDeploymentStatus extends
CommonStatus<FlinkDeploymentSpec> {
/** Status of the last reconcile operation. */
private FlinkDeploymentReconciliationStatus reconciliationStatus =
new FlinkDeploymentReconciliationStatus();
+
+ /** Information about the TaskManagers for the scale subresource. */
+ private TaskManagerInfo taskManager;
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/TaskManagerInfo.java
similarity index 65%
copy from
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
copy to
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/TaskManagerInfo.java
index 115791b..1e5ef43 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/TaskManagerInfo.java
@@ -15,24 +15,27 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.crd.spec;
+package org.apache.flink.kubernetes.operator.crd.status;
import org.apache.flink.annotation.Experimental;
-import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.model.annotation.LabelSelector;
+import io.fabric8.kubernetes.model.annotation.StatusReplicas;
import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
-/** TaskManager spec. */
+/** Last observed status of the Flink job within an application deployment. */
@Experimental
@Data
@NoArgsConstructor
@AllArgsConstructor
-public class TaskManagerSpec {
- /** Resource specification for the TaskManager pods. */
- private Resource resource;
+@Builder(toBuilder = true)
+public class TaskManagerInfo {
+ /** TaskManager label selector. */
+ @LabelSelector private String labelSelector;
- /** TaskManager pod template. It will be merged with
FlinkDeploymentSpec.podTemplate. */
- private Pod podTemplate;
+ /** Number of TaskManager replicas if defined in the spec. */
+ @StatusReplicas private int replicas = 0;
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index cd28a57..bd2c5bb 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -35,6 +35,7 @@ import
org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
+import org.apache.flink.kubernetes.operator.crd.status.TaskManagerInfo;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.service.FlinkService;
@@ -67,12 +68,22 @@ public class ReconciliationUtils {
private static final ObjectMapper objectMapper = new ObjectMapper();
public static <SPEC extends AbstractFlinkSpec> void
updateForSpecReconciliationSuccess(
- AbstractFlinkResource<SPEC, ?> target, JobState
stateAfterReconcile) {
- var commonStatus = target.getStatus();
+ AbstractFlinkResource<SPEC, ?> target,
+ JobState stateAfterReconcile,
+ Configuration conf) {
+ var status = target.getStatus();
var spec = target.getSpec();
- ReconciliationStatus<SPEC> reconciliationStatus =
commonStatus.getReconciliationStatus();
- commonStatus.setError(null);
+ ReconciliationStatus<SPEC> reconciliationStatus =
status.getReconciliationStatus();
+ status.setError(null);
+
+ // For application deployments we update the taskmanager info
+ if (target instanceof FlinkDeployment && spec.getJob() != null) {
+ ((FlinkDeploymentStatus) status)
+ .setTaskManager(
+ getTaskManagerInfo(
+ target.getMetadata().getName(), conf,
stateAfterReconcile));
+ }
var clonedSpec = ReconciliationUtils.clone(spec);
AbstractFlinkSpec lastReconciledSpec =
reconciliationStatus.deserializeLastReconciledSpec();
@@ -116,6 +127,16 @@ public class ReconciliationUtils {
reconciliationStatus.setReconciliationTimestamp(System.currentTimeMillis());
}
+ private static TaskManagerInfo getTaskManagerInfo(
+ String name, Configuration conf, JobState jobState) {
+ var labelSelector = "component=taskmanager,app=" + name;
+ if (jobState == JobState.RUNNING) {
+ return new TaskManagerInfo(labelSelector,
FlinkUtils.getNumTaskManagers(conf));
+ } else {
+ return new TaskManagerInfo("", 0);
+ }
+ }
+
public static void updateForReconciliationError(
AbstractFlinkResource<?, ?> target, String error) {
target.getStatus().setError(error);
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractDeploymentReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractDeploymentReconciler.java
index 51786f5..6c77657 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractDeploymentReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractDeploymentReconciler.java
@@ -17,6 +17,7 @@
package org.apache.flink.kubernetes.operator.reconciler.deployment;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
@@ -71,14 +72,15 @@ public abstract class AbstractDeploymentReconciler
implements Reconciler<FlinkDe
return false;
}
- protected boolean newSpecIsAlreadyDeployed(FlinkDeployment flinkApp) {
+ protected boolean newSpecIsAlreadyDeployed(FlinkDeployment flinkApp,
Configuration deployConf) {
FlinkDeploymentSpec deployedSpec =
ReconciliationUtils.getDeployedSpec(flinkApp);
if (flinkApp.getSpec().equals(deployedSpec)) {
LOG.info(
"The new spec matches the currently deployed last stable
spec. No upgrade needed.");
ReconciliationUtils.updateForSpecReconciliationSuccess(
flinkApp,
- deployedSpec.getJob() != null ?
deployedSpec.getJob().getState() : null);
+ deployedSpec.getJob() != null ?
deployedSpec.getJob().getState() : null,
+ deployConf);
return true;
}
return false;
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 5c8030b..01bfe69 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -89,7 +89,8 @@ public class ApplicationReconciler extends
AbstractDeploymentReconciler {
false);
IngressUtils.updateIngressRules(
deployMeta, currentDeploySpec, deployConfig,
kubernetesClient);
- ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp,
JobState.RUNNING);
+ ReconciliationUtils.updateForSpecReconciliationSuccess(
+ flinkApp, JobState.RUNNING, deployConfig);
return;
}
@@ -103,7 +104,7 @@ public class ApplicationReconciler extends
AbstractDeploymentReconciler {
Configuration observeConfig = configManager.getObserveConfig(flinkApp);
boolean specChanged = !currentDeploySpec.equals(lastReconciledSpec);
if (specChanged) {
- if (newSpecIsAlreadyDeployed(flinkApp)) {
+ if (newSpecIsAlreadyDeployed(flinkApp, deployConfig)) {
return;
}
LOG.debug("Detected spec change, starting upgrade process.");
@@ -134,7 +135,8 @@ public class ApplicationReconciler extends
AbstractDeploymentReconciler {
lastReconciledSpec.getJob().getUpgradeMode() ==
UpgradeMode.LAST_STATE);
stateAfterReconcile = JobState.RUNNING;
}
- ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp,
stateAfterReconcile);
+ ReconciliationUtils.updateForSpecReconciliationSuccess(
+ flinkApp, stateAfterReconcile, deployConfig);
IngressUtils.updateIngressRules(
deployMeta, currentDeploySpec, deployConfig,
kubernetesClient);
} else if (ReconciliationUtils.shouldRollBack(
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index 18045bf..6057178 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -72,22 +72,21 @@ public class SessionReconciler extends
AbstractDeploymentReconciler {
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
IngressUtils.updateIngressRules(
flinkApp.getMetadata(), currentDeploySpec, conf,
kubernetesClient);
- ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp,
null);
+ ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp,
null, conf);
return;
}
Configuration observeConfig = configManager.getObserveConfig(flinkApp);
boolean specChanged = !currentDeploySpec.equals(lastReconciledSpec);
if (specChanged) {
- if (newSpecIsAlreadyDeployed(flinkApp)) {
+ var deployConf =
+ configManager.getDeployConfig(flinkApp.getMetadata(),
currentDeploySpec);
+ if (newSpecIsAlreadyDeployed(flinkApp, deployConf)) {
return;
}
LOG.debug("Detected spec change, starting upgrade process.");
- upgradeSessionCluster(
- flinkApp,
- currentDeploySpec,
- configManager.getDeployConfig(flinkApp.getMetadata(),
currentDeploySpec));
- ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp,
null);
+ upgradeSessionCluster(flinkApp, currentDeploySpec, deployConf);
+ ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp,
null, deployConf);
} else if (ReconciliationUtils.shouldRollBack(
flinkService, reconciliationStatus, observeConfig)) {
rollbackSessionCluster(flinkApp);
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java
index c46f3e3..0079d80 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java
@@ -92,7 +92,7 @@ public class FlinkSessionJobReconciler implements
Reconciler<FlinkSessionJob> {
flinkSessionJob.getSpec().getJob().getInitialSavepointPath())
.orElse(null));
ReconciliationUtils.updateForSpecReconciliationSuccess(
- flinkSessionJob, JobState.RUNNING);
+ flinkSessionJob, JobState.RUNNING, sessionJobConfig);
return;
}
@@ -127,7 +127,7 @@ public class FlinkSessionJobReconciler implements
Reconciler<FlinkSessionJob> {
stateAfterReconcile = JobState.RUNNING;
}
ReconciliationUtils.updateForSpecReconciliationSuccess(
- flinkSessionJob, stateAfterReconcile);
+ flinkSessionJob, stateAfterReconcile, sessionJobConfig);
} else {
if (!SavepointUtils.triggerSavepointIfNeeded(
flinkService, flinkSessionJob, sessionJobConfig)) {
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index 3896f05..f672d65 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -19,7 +19,9 @@ package org.apache.flink.kubernetes.operator.utils;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
import
org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
@@ -278,4 +280,10 @@ public class FlinkUtils {
public static boolean clusterShutdownDisabled(FlinkDeploymentSpec spec) {
return spec.getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_14);
}
+
+ public static int getNumTaskManagers(Configuration conf) {
+ int parallelism = conf.get(CoreOptions.DEFAULT_PARALLELISM);
+ int taskSlots = conf.get(TaskManagerOptions.NUM_TASK_SLOTS);
+ return (parallelism + taskSlots - 1) / taskSlots;
+ }
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index 54611fe..0c28ff8 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -43,6 +43,8 @@ import
org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.util.StringUtils;
+import javax.annotation.Nullable;
+
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -82,7 +84,7 @@ public class DefaultValidator implements
FlinkResourceValidator {
deployment.getMetadata().getName(),
deployment.getMetadata().getNamespace()),
validateLogConfig(spec.getLogConfiguration()),
- validateJobSpec(spec.getJob(), effectiveConfig),
+ validateJobSpec(spec.getJob(), spec.getTaskManager(),
effectiveConfig),
validateJmSpec(spec.getJobManager(), effectiveConfig),
validateTmSpec(spec.getTaskManager()),
validateSpecChange(deployment, effectiveConfig),
@@ -155,15 +157,12 @@ public class DefaultValidator implements
FlinkResourceValidator {
return Optional.empty();
}
- private Optional<String> validateJobSpec(JobSpec job, Map<String, String>
confMap) {
+ private Optional<String> validateJobSpec(
+ JobSpec job, @Nullable TaskManagerSpec tm, Map<String, String>
confMap) {
if (job == null) {
return Optional.empty();
}
- if (job.getParallelism() < 1) {
- return Optional.of("Job parallelism must be larger than 0");
- }
-
if (job.getJarURI() == null) {
return Optional.of("Jar URI must be defined");
}
@@ -206,6 +205,14 @@ public class DefaultValidator implements
FlinkResourceValidator {
}
}
+ var tmReplicasDefined = tm != null && tm.getReplicas() != null;
+
+ if (tmReplicasDefined && tm.getReplicas() < 1) {
+ return Optional.of("TaskManager replicas must be larger than 0");
+ } else if (!tmReplicasDefined && job.getParallelism() < 1) {
+ return Optional.of("Job parallelism must be larger than 0");
+ }
+
return Optional.empty();
}
@@ -235,6 +242,10 @@ public class DefaultValidator implements
FlinkResourceValidator {
return Optional.empty();
}
+ if (tmSpec.getReplicas() != null && tmSpec.getReplicas() < 1) {
+ return Optional.of("TaskManager replicas should not be configured
less than one.");
+ }
+
return validateResources("TaskManager", tmSpec.getResource());
}
@@ -336,7 +347,7 @@ public class DefaultValidator implements
FlinkResourceValidator {
return firstPresent(
validateNotApplicationCluster(sessionCluster),
validateSessionClusterId(sessionJob, sessionCluster),
- validateJobSpec(sessionJob.getSpec().getJob(),
effectiveConfig));
+ validateJobSpec(sessionJob.getSpec().getJob(), null,
effectiveConfig));
}
private Optional<String> validateJobNotEmpty(FlinkSessionJob sessionJob) {
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 306eb3a..2ad7c1e 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -167,7 +167,7 @@ public class TestUtils {
.flinkVersion(version)
.flinkConfiguration(conf)
.jobManager(new JobManagerSpec(new Resource(1.0, "2048m"), 1,
null))
- .taskManager(new TaskManagerSpec(new Resource(1.0, "2048m"),
null))
+ .taskManager(new TaskManagerSpec(new Resource(1.0, "2048m"),
null, null))
.build();
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
index 2cc456c..df5bf41 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
@@ -33,6 +33,7 @@ import
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.utils.Constants;
@@ -293,7 +294,7 @@ public class FlinkConfigBuilderTest {
@Test
public void testApplyJobOrSessionSpec() throws Exception {
flinkDeployment.getSpec().getJob().setAllowNonRestoredState(true);
- final Configuration configuration =
+ var configuration =
new FlinkConfigBuilder(flinkDeployment, new Configuration())
.applyJobOrSessionSpec()
.build();
@@ -305,6 +306,18 @@ public class FlinkConfigBuilderTest {
Assertions.assertEquals(SAMPLE_JAR,
configuration.get(PipelineOptions.JARS).get(0));
Assertions.assertEquals(
Integer.valueOf(2),
configuration.get(CoreOptions.DEFAULT_PARALLELISM));
+
+ var dep = ReconciliationUtils.clone(flinkDeployment);
+ dep.getSpec().setTaskManager(new TaskManagerSpec());
+ dep.getSpec().getTaskManager().setReplicas(3);
+
dep.getSpec().getFlinkConfiguration().put(TaskManagerOptions.NUM_TASK_SLOTS.key(),
"4");
+ configuration =
+ new FlinkConfigBuilder(dep, new Configuration())
+ .applyFlinkConfiguration()
+ .applyJobOrSessionSpec()
+ .build();
+
+ Assertions.assertEquals(12,
configuration.get(CoreOptions.DEFAULT_PARALLELISM));
}
@Test
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
index 70670dc..5be498b 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
@@ -102,7 +102,8 @@ public class FlinkConfigManagerTest {
deployment.getSpec().setLogConfiguration(Map.of(Constants.CONFIG_FILE_LOG4J_NAME,
"test"));
deployment.getSpec().setPodTemplate(new Pod());
- ReconciliationUtils.updateForSpecReconciliationSuccess(deployment,
JobState.RUNNING);
+ ReconciliationUtils.updateForSpecReconciliationSuccess(
+ deployment, JobState.RUNNING, config);
Configuration deployConfig =
configManager.getObserveConfig(deployment);
assertFalse(
deployConfig.contains(
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 7b0c0c7..ffae61f 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -32,6 +32,7 @@ import
org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import org.apache.flink.kubernetes.operator.crd.status.TaskManagerInfo;
import
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
@@ -287,6 +288,10 @@ public class FlinkDeploymentControllerTest {
List<Tuple2<String, JobStatusMessage>> jobs = flinkService.listJobs();
assertEquals(1, jobs.size());
assertEquals("s0", jobs.get(0).f0);
+ assertEquals(
+ new TaskManagerInfo(
+ "component=taskmanager,app=" +
appCluster.getMetadata().getName(), 1),
+ appCluster.getStatus().getTaskManager());
List<Tuple2<String, JobStatusMessage>> previousJobs = new
ArrayList<>(jobs);
appCluster.getSpec().getJob().setInitialSavepointPath("s1");
@@ -322,6 +327,7 @@ public class FlinkDeploymentControllerTest {
.getSavepointInfo()
.getSavepointHistory()
.size());
+ assertEquals(new TaskManagerInfo("", 0),
appCluster.getStatus().getTaskManager());
testController.reconcile(appCluster, context);
jobs = flinkService.listJobs();
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
index c5c823d..f5afb2b 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
@@ -93,7 +93,8 @@ public class FlinkServiceTest {
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
JobStatus jobStatus = deployment.getStatus().getJobStatus();
jobStatus.setJobId(jobID.toHexString());
- ReconciliationUtils.updateForSpecReconciliationSuccess(deployment,
JobState.RUNNING);
+ ReconciliationUtils.updateForSpecReconciliationSuccess(
+ deployment, JobState.RUNNING, new Configuration());
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
deployment.getStatus().getJobStatus().setState("RUNNING");
@@ -130,7 +131,8 @@ public class FlinkServiceTest {
JobStatus jobStatus = deployment.getStatus().getJobStatus();
jobStatus.setJobId(jobID.toHexString());
jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
- ReconciliationUtils.updateForSpecReconciliationSuccess(deployment,
JobState.RUNNING);
+ ReconciliationUtils.updateForSpecReconciliationSuccess(
+ deployment, JobState.RUNNING, new Configuration());
flinkService.cancelJob(deployment, UpgradeMode.SAVEPOINT);
assertTrue(stopWithSavepointFuture.isDone());
@@ -143,7 +145,8 @@ public class FlinkServiceTest {
@Test
public void testCancelJobWithLastStateUpgradeMode() throws Exception {
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
- ReconciliationUtils.updateForSpecReconciliationSuccess(deployment,
JobState.RUNNING);
+ ReconciliationUtils.updateForSpecReconciliationSuccess(
+ deployment, JobState.RUNNING, new Configuration());
final TestingClusterClient<String> testingClusterClient =
new TestingClusterClient<>(configuration,
TestUtils.TEST_DEPLOYMENT_NAME);
final FlinkService flinkService =
createFlinkService(testingClusterClient);
@@ -198,7 +201,8 @@ public class FlinkServiceTest {
final JobID jobID = JobID.generate();
final FlinkDeployment flinkDeployment =
TestUtils.buildApplicationCluster();
-
ReconciliationUtils.updateForSpecReconciliationSuccess(flinkDeployment,
JobState.RUNNING);
+ ReconciliationUtils.updateForSpecReconciliationSuccess(
+ flinkDeployment, JobState.RUNNING, new Configuration());
JobStatus jobStatus = new JobStatus();
jobStatus.setJobId(jobID.toString());
flinkDeployment.getStatus().setJobStatus(jobStatus);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
index 09a8968..dae68ff 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
@@ -19,6 +19,9 @@
package org.apache.flink.kubernetes.operator.utils;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
@@ -106,6 +109,22 @@ public class FlinkUtilsTest {
clusterId, kubernetesClient.getNamespace(), kubernetesClient);
}
+ @Test
+ public void testComputeNumTms() {
+ Configuration conf = new Configuration();
+ conf.set(CoreOptions.DEFAULT_PARALLELISM, 2);
+ conf.set(TaskManagerOptions.NUM_TASK_SLOTS, 1);
+
+ assertEquals(2, FlinkUtils.getNumTaskManagers(conf));
+
+ conf.set(TaskManagerOptions.NUM_TASK_SLOTS, 3);
+ assertEquals(1, FlinkUtils.getNumTaskManagers(conf));
+
+ conf.set(CoreOptions.DEFAULT_PARALLELISM, 7);
+ conf.set(TaskManagerOptions.NUM_TASK_SLOTS, 2);
+ assertEquals(4, FlinkUtils.getNumTaskManagers(conf));
+ }
+
private void createHAConfigMapWithData(
String configMapName, String clusterId, Map<String, String> data) {
final ConfigMap kubernetesConfigMap =
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
index 289b498..7e1f21e 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
@@ -54,7 +54,8 @@ public class ReconciliationUtilsTest {
current.getStatus()
.getReconciliationStatus()
.serializeAndSetLastReconciledSpec(ReconciliationUtils.clone(current.getSpec()));
- ReconciliationUtils.updateForSpecReconciliationSuccess(current,
JobState.SUSPENDED);
+ ReconciliationUtils.updateForSpecReconciliationSuccess(
+ current, JobState.SUSPENDED, new Configuration());
UpdateControl<FlinkDeployment> updateControl =
ReconciliationUtils.toUpdateControl(operatorConfiguration,
current, previous, true);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index 24af718..235bb1b 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -32,6 +32,7 @@ import
org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import
org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentReconciliationStatus;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
@@ -76,9 +77,21 @@ public class DefaultValidatorTest {
testError(
dep -> dep.getSpec().getJob().setParallelism(0),
"Job parallelism must be larger than 0");
+
testError(
- dep -> dep.getSpec().getJob().setParallelism(-1),
- "Job parallelism must be larger than 0");
+ dep -> {
+ var tmSpec = new TaskManagerSpec();
+ tmSpec.setReplicas(0);
+ dep.getSpec().setTaskManager(tmSpec);
+ },
+ "TaskManager replicas must be larger than 0");
+
+ testSuccess(
+ dep -> {
+ dep.getSpec().getTaskManager().setReplicas(1);
+ dep.getSpec().getJob().setParallelism(0);
+ });
+
testError(
dep -> {
dep.getSpec().setFlinkConfiguration(new HashMap<>());
diff --git
a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index 6246a36..83f3b9e 100644
---
a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++
b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -6048,6 +6048,8 @@ spec:
memory:
type: string
type: object
+ replicas:
+ type: integer
podTemplate:
properties:
apiVersion:
@@ -9112,6 +9114,13 @@ spec:
- ROLLED_BACK
type: string
type: object
+ taskManager:
+ properties:
+ labelSelector:
+ type: string
+ replicas:
+ type: integer
+ type: object
jobStatus:
properties:
jobName:
@@ -9178,4 +9187,8 @@ spec:
served: true
storage: true
subresources:
+ scale:
+ labelSelectorPath: .status.taskManager.labelSelector
+ specReplicasPath: .spec.taskManager.replicas
+ statusReplicasPath: .status.taskManager.replicas
status: {}