This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new efefd31b [FLINK-30361] Do not redeploy task managers when scaling
standalone session cluster
efefd31b is described below
commit efefd31b5155436b36e53101e92a51ada2f240b0
Author: Swathi Chandrashekar <[email protected]>
AuthorDate: Wed Dec 14 20:32:03 2022 +0530
[FLINK-30361] Do not redeploy task managers when scaling standalone session
cluster
---
.../operator/api/spec/TaskManagerSpec.java | 9 ++-
.../AbstractFlinkResourceReconciler.java | 38 +++++++++++-
.../deployment/AbstractJobReconciler.java | 14 -----
.../operator/service/StandaloneFlinkService.java | 4 +-
.../kubernetes/operator/TestingFlinkService.java | 4 +-
.../operator/reconciler/diff/SpecDiffTest.java | 13 ++++
.../service/StandaloneFlinkServiceTest.java | 72 +++++++++++++++++-----
7 files changed, 120 insertions(+), 34 deletions(-)
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/TaskManagerSpec.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/TaskManagerSpec.java
index 54ff749e..99058de6 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/TaskManagerSpec.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/TaskManagerSpec.java
@@ -18,6 +18,9 @@
package org.apache.flink.kubernetes.operator.api.spec;
import org.apache.flink.annotation.Experimental;
+import org.apache.flink.kubernetes.operator.api.diff.DiffType;
+import org.apache.flink.kubernetes.operator.api.diff.Diffable;
+import org.apache.flink.kubernetes.operator.api.diff.SpecDiff;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.model.annotation.SpecReplicas;
@@ -32,12 +35,14 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
@Builder
-public class TaskManagerSpec {
+public class TaskManagerSpec implements Diffable<TaskManagerSpec> {
/** Resource specification for the TaskManager pods. */
private Resource resource;
/** Number of TaskManager replicas. If defined, takes precedence over
parallelism */
- @SpecReplicas private Integer replicas;
+ @SpecDiff(DiffType.SCALE)
+ @SpecReplicas
+ private Integer replicas;
/** TaskManager pod template. It will be merged with
FlinkDeploymentSpec.podTemplate. */
private Pod podTemplate;
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index d1c2c743..ea6a3048 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -158,7 +158,10 @@ public abstract class AbstractFlinkResourceReconciler<
EventRecorder.Component.JobManagerDeployment,
specChangeMessage);
}
- reconcileSpecChange(cr, ctx, observeConfig, deployConfig,
specDiff.getType());
+ boolean scale = scaleCluster(cr, ctx, observeConfig, deployConfig,
specDiff.getType());
+ if (!scale) {
+ reconcileSpecChange(cr, ctx, observeConfig, deployConfig,
specDiff.getType());
+ }
} else if (shouldRollBack(cr, observeConfig, flinkService)) {
// Rollbacks are executed in two steps, we initiate it first then
return
if (initiateRollBack(status)) {
@@ -316,6 +319,39 @@ public abstract class AbstractFlinkResourceReconciler<
return false;
}
+ /**
+ * Scale the cluster whenever there is a scaling change, based on the task
manager replica
+ * update or the parallelism in case of scheduler mode.
+ *
+ * @param cr Resource being reconciled.
+ * @param ctx Reconciliation context.
+ * @param observeConfig Observe configuration.
+ * @param deployConfig Configuration to be deployed.
+ * @param diffType Spec change type.
+ * @return True if the scaling is successful
+ * @throws Exception
+ */
+ private boolean scaleCluster(
+ CR cr,
+ Context<?> ctx,
+ Configuration observeConfig,
+ Configuration deployConfig,
+ DiffType diffType)
+ throws Exception {
+ if (diffType != DiffType.SCALE) {
+ return false;
+ }
+ boolean scaled =
+ getFlinkService(cr, ctx)
+ .scale(cr.getMetadata(), cr.getSpec().getJob(),
deployConfig);
+ if (scaled) {
+ LOG.info("Scaling succeeded");
+ ReconciliationUtils.updateStatusForDeployedSpec(cr, deployConfig);
+ return true;
+ }
+ return false;
+ }
+
/**
* Checks whether the currently deployed Flink resource spec should be
rolled back to the stable
* spec. This includes validating the current deployment status, config
and checking if the last
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 370d96dd..3b777e4e 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -93,20 +93,6 @@ public abstract class AbstractJobReconciler<
SPEC lastReconciledSpec =
reconciliationStatus.deserializeLastReconciledSpec();
SPEC currentDeploySpec = resource.getSpec();
- if (diffType == DiffType.SCALE) {
- boolean scaled =
- getFlinkService(resource, ctx)
- .scale(
- resource.getMetadata(),
- resource.getSpec().getJob(),
- deployConfig);
- if (scaled) {
- LOG.info("Reactive scaling succeeded");
- ReconciliationUtils.updateStatusForDeployedSpec(resource,
deployConfig);
- return;
- }
- }
-
JobState currentJobState = lastReconciledSpec.getJob().getState();
JobState desiredJobState = currentDeploySpec.getJob().getState();
if (currentJobState == JobState.RUNNING) {
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
index ad3e190e..de467879 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
@@ -23,6 +23,7 @@ import
org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
@@ -171,7 +172,8 @@ public class StandaloneFlinkService extends
AbstractFlinkService {
@Override
public boolean scale(ObjectMeta meta, JobSpec jobSpec, Configuration conf)
{
- if (conf.get(JobManagerOptions.SCHEDULER_MODE) == null) {
+ if (conf.get(JobManagerOptions.SCHEDULER_MODE) !=
SchedulerExecutionMode.REACTIVE
+ && jobSpec != null) {
LOG.info("Reactive scaling is not enabled");
return false;
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index a78705df..97175b15 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
+import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
@@ -492,7 +493,8 @@ public class TestingFlinkService extends
AbstractFlinkService {
@Override
public boolean scale(ObjectMeta meta, JobSpec jobSpec, Configuration conf)
{
- if (conf.get(JobManagerOptions.SCHEDULER_MODE) == null) {
+ if (conf.get(JobManagerOptions.SCHEDULER_MODE) !=
SchedulerExecutionMode.REACTIVE
+ && jobSpec != null) {
return false;
}
desiredReplicas =
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java
index 4526add6..30c75e62 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java
@@ -124,6 +124,19 @@ public class SpecDiffTest {
diff = new ReflectiveDiffBuilder<>(left, right).build();
assertEquals(DiffType.UPGRADE, diff.getType());
assertEquals(22, diff.getNumDiffs());
+ left.setMode(KubernetesDeploymentMode.STANDALONE);
+ left.getTaskManager().setReplicas(2);
+ left.getTaskManager().getResource().setMemory("1024");
+ right = SpecUtils.clone(left);
+ right.getTaskManager().setReplicas(3);
+ diff = new ReflectiveDiffBuilder<>(left, right).build();
+ assertEquals(DiffType.SCALE, diff.getType());
+ assertEquals(1, diff.getNumDiffs());
+ right.getTaskManager().getResource().setMemory("2048");
+ right.getTaskManager().setReplicas(4);
+ diff = new ReflectiveDiffBuilder<>(left, right).build();
+ assertEquals(DiffType.UPGRADE, diff.getType());
+ assertEquals(2, diff.getNumDiffs());
}
@Test
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
index b406220f..716665ed 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
@@ -108,26 +108,28 @@ public class StandaloneFlinkServiceTest {
}
@Test
- public void testReactiveScale() throws Exception {
+ public void testTMReplicaScaleApplication() throws Exception {
var flinkDeployment = TestUtils.buildApplicationCluster();
var clusterId = flinkDeployment.getMetadata().getName();
var namespace = flinkDeployment.getMetadata().getNamespace();
flinkDeployment.getSpec().setMode(KubernetesDeploymentMode.STANDALONE);
+
+ // Add parallelism change, verify it is honoured in reactive mode
flinkDeployment
.getSpec()
.getFlinkConfiguration()
.put(
JobManagerOptions.SCHEDULER_MODE.key(),
SchedulerExecutionMode.REACTIVE.name());
+ flinkDeployment.getSpec().getJob().setParallelism(4);
createDeployments(flinkDeployment);
assertTrue(
flinkStandaloneService.scale(
flinkDeployment.getMetadata(),
flinkDeployment.getSpec().getJob(),
buildConfig(flinkDeployment, configuration)));
-
assertEquals(
- 1,
+ 2,
kubernetesClient
.apps()
.deployments()
@@ -137,7 +139,9 @@ public class StandaloneFlinkServiceTest {
.getSpec()
.getReplicas());
- flinkDeployment.getSpec().getJob().setParallelism(4);
+ // Add parallelism and replica change, verify if replica change is
honoured in reactive mode
+ flinkDeployment.getSpec().getJob().setParallelism(100);
+ flinkDeployment.getSpec().getTaskManager().setReplicas(2);
assertTrue(
flinkStandaloneService.scale(
flinkDeployment.getMetadata(),
@@ -154,18 +158,35 @@ public class StandaloneFlinkServiceTest {
.getSpec()
.getReplicas());
- kubernetesClient
- .apps()
- .deployments()
- .inNamespace(namespace)
-
.withName(StandaloneKubernetesUtils.getTaskManagerDeploymentName(clusterId))
- .delete();
+ // Verify that any change in parallelism doesnt scale the cluster
without reactive mode
+ flinkDeployment.getSpec().getJob().setParallelism(100);
+ flinkDeployment
+ .getSpec()
+ .getFlinkConfiguration()
+ .remove(JobManagerOptions.SCHEDULER_MODE.key());
+ assertFalse(
+ flinkStandaloneService.scale(
+ flinkDeployment.getMetadata(),
+ flinkDeployment.getSpec().getJob(),
+ buildConfig(flinkDeployment, configuration)));
+
+ // Add replicas and verify that the scaling is not honoured as
reactive mode not enabled
+ flinkDeployment.getSpec().getTaskManager().setReplicas(10);
assertFalse(
flinkStandaloneService.scale(
flinkDeployment.getMetadata(),
flinkDeployment.getSpec().getJob(),
buildConfig(flinkDeployment, configuration)));
+ }
+ @Test
+ public void testTMReplicaScaleSession() throws Exception {
+ var flinkDeployment = TestUtils.buildSessionCluster();
+ var clusterId = flinkDeployment.getMetadata().getName();
+ var namespace = flinkDeployment.getMetadata().getNamespace();
+ flinkDeployment.getSpec().setMode(KubernetesDeploymentMode.STANDALONE);
+ // Add replicas
+ flinkDeployment.getSpec().getTaskManager().setReplicas(3);
createDeployments(flinkDeployment);
assertTrue(
flinkStandaloneService.scale(
@@ -173,15 +194,36 @@ public class StandaloneFlinkServiceTest {
flinkDeployment.getSpec().getJob(),
buildConfig(flinkDeployment, configuration)));
- flinkDeployment
- .getSpec()
- .getFlinkConfiguration()
- .remove(JobManagerOptions.SCHEDULER_MODE.key());
- assertFalse(
+ assertEquals(
+ 3,
+ kubernetesClient
+ .apps()
+ .deployments()
+ .inNamespace(namespace)
+
.withName(StandaloneKubernetesUtils.getTaskManagerDeploymentName(clusterId))
+ .get()
+ .getSpec()
+ .getReplicas());
+
+ // Scale the replica count of the task managers
+ flinkDeployment.getSpec().getTaskManager().setReplicas(10);
+ createDeployments(flinkDeployment);
+ assertTrue(
flinkStandaloneService.scale(
flinkDeployment.getMetadata(),
flinkDeployment.getSpec().getJob(),
buildConfig(flinkDeployment, configuration)));
+
+ assertEquals(
+ 10,
+ kubernetesClient
+ .apps()
+ .deployments()
+ .inNamespace(namespace)
+
.withName(StandaloneKubernetesUtils.getTaskManagerDeploymentName(clusterId))
+ .get()
+ .getSpec()
+ .getReplicas());
}
@Test