This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new efefd31b [FLINK-30361] Do not redeploy task managers when scaling 
standalone session cluster
efefd31b is described below

commit efefd31b5155436b36e53101e92a51ada2f240b0
Author: Swathi Chandrashekar <[email protected]>
AuthorDate: Wed Dec 14 20:32:03 2022 +0530

    [FLINK-30361] Do not redeploy task managers when scaling standalone session 
cluster
---
 .../operator/api/spec/TaskManagerSpec.java         |  9 ++-
 .../AbstractFlinkResourceReconciler.java           | 38 +++++++++++-
 .../deployment/AbstractJobReconciler.java          | 14 -----
 .../operator/service/StandaloneFlinkService.java   |  4 +-
 .../kubernetes/operator/TestingFlinkService.java   |  4 +-
 .../operator/reconciler/diff/SpecDiffTest.java     | 13 ++++
 .../service/StandaloneFlinkServiceTest.java        | 72 +++++++++++++++++-----
 7 files changed, 120 insertions(+), 34 deletions(-)

diff --git 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/TaskManagerSpec.java
 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/TaskManagerSpec.java
index 54ff749e..99058de6 100644
--- 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/TaskManagerSpec.java
+++ 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/TaskManagerSpec.java
@@ -18,6 +18,9 @@
 package org.apache.flink.kubernetes.operator.api.spec;
 
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.kubernetes.operator.api.diff.DiffType;
+import org.apache.flink.kubernetes.operator.api.diff.Diffable;
+import org.apache.flink.kubernetes.operator.api.diff.SpecDiff;
 
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.model.annotation.SpecReplicas;
@@ -32,12 +35,14 @@ import lombok.NoArgsConstructor;
 @NoArgsConstructor
 @AllArgsConstructor
 @Builder
-public class TaskManagerSpec {
+public class TaskManagerSpec implements Diffable<TaskManagerSpec> {
     /** Resource specification for the TaskManager pods. */
     private Resource resource;
 
     /** Number of TaskManager replicas. If defined, takes precedence over 
parallelism */
-    @SpecReplicas private Integer replicas;
+    @SpecDiff(DiffType.SCALE)
+    @SpecReplicas
+    private Integer replicas;
 
     /** TaskManager pod template. It will be merged with 
FlinkDeploymentSpec.podTemplate. */
     private Pod podTemplate;
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index d1c2c743..ea6a3048 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -158,7 +158,10 @@ public abstract class AbstractFlinkResourceReconciler<
                         EventRecorder.Component.JobManagerDeployment,
                         specChangeMessage);
             }
-            reconcileSpecChange(cr, ctx, observeConfig, deployConfig, 
specDiff.getType());
+            boolean scale = scaleCluster(cr, ctx, observeConfig, deployConfig, 
specDiff.getType());
+            if (!scale) {
+                reconcileSpecChange(cr, ctx, observeConfig, deployConfig, 
specDiff.getType());
+            }
         } else if (shouldRollBack(cr, observeConfig, flinkService)) {
             // Rollbacks are executed in two steps, we initiate it first then 
return
             if (initiateRollBack(status)) {
@@ -316,6 +319,39 @@ public abstract class AbstractFlinkResourceReconciler<
         return false;
     }
 
+    /**
+     * Scale the cluster whenever there is a scaling change, based on the task 
manager replica
+     * update or the parallelism in case of scheduler mode.
+     *
+     * @param cr Resource being reconciled.
+     * @param ctx Reconciliation context.
+     * @param observeConfig Observe configuration.
+     * @param deployConfig Configuration to be deployed.
+     * @param diffType Spec change type.
+     * @return True if the scaling is successful
+     * @throws Exception
+     */
+    private boolean scaleCluster(
+            CR cr,
+            Context<?> ctx,
+            Configuration observeConfig,
+            Configuration deployConfig,
+            DiffType diffType)
+            throws Exception {
+        if (diffType != DiffType.SCALE) {
+            return false;
+        }
+        boolean scaled =
+                getFlinkService(cr, ctx)
+                        .scale(cr.getMetadata(), cr.getSpec().getJob(), 
deployConfig);
+        if (scaled) {
+            LOG.info("Scaling succeeded");
+            ReconciliationUtils.updateStatusForDeployedSpec(cr, deployConfig);
+            return true;
+        }
+        return false;
+    }
+
     /**
      * Checks whether the currently deployed Flink resource spec should be 
rolled back to the stable
      * spec. This includes validating the current deployment status, config 
and checking if the last
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 370d96dd..3b777e4e 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -93,20 +93,6 @@ public abstract class AbstractJobReconciler<
         SPEC lastReconciledSpec = 
reconciliationStatus.deserializeLastReconciledSpec();
         SPEC currentDeploySpec = resource.getSpec();
 
-        if (diffType == DiffType.SCALE) {
-            boolean scaled =
-                    getFlinkService(resource, ctx)
-                            .scale(
-                                    resource.getMetadata(),
-                                    resource.getSpec().getJob(),
-                                    deployConfig);
-            if (scaled) {
-                LOG.info("Reactive scaling succeeded");
-                ReconciliationUtils.updateStatusForDeployedSpec(resource, 
deployConfig);
-                return;
-            }
-        }
-
         JobState currentJobState = lastReconciledSpec.getJob().getState();
         JobState desiredJobState = currentDeploySpec.getJob().getState();
         if (currentJobState == JobState.RUNNING) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
index ad3e190e..de467879 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.deployment.application.ApplicationConfiguration;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.SchedulerExecutionMode;
 import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
@@ -171,7 +172,8 @@ public class StandaloneFlinkService extends 
AbstractFlinkService {
 
     @Override
     public boolean scale(ObjectMeta meta, JobSpec jobSpec, Configuration conf) 
{
-        if (conf.get(JobManagerOptions.SCHEDULER_MODE) == null) {
+        if (conf.get(JobManagerOptions.SCHEDULER_MODE) != 
SchedulerExecutionMode.REACTIVE
+                && jobSpec != null) {
             LOG.info("Reactive scaling is not enabled");
             return false;
         }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index a78705df..97175b15 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.PipelineOptionsInternal;
+import org.apache.flink.configuration.SchedulerExecutionMode;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
@@ -492,7 +493,8 @@ public class TestingFlinkService extends 
AbstractFlinkService {
 
     @Override
     public boolean scale(ObjectMeta meta, JobSpec jobSpec, Configuration conf) 
{
-        if (conf.get(JobManagerOptions.SCHEDULER_MODE) == null) {
+        if (conf.get(JobManagerOptions.SCHEDULER_MODE) != 
SchedulerExecutionMode.REACTIVE
+                && jobSpec != null) {
             return false;
         }
         desiredReplicas =
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java
index 4526add6..30c75e62 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java
@@ -124,6 +124,19 @@ public class SpecDiffTest {
         diff = new ReflectiveDiffBuilder<>(left, right).build();
         assertEquals(DiffType.UPGRADE, diff.getType());
         assertEquals(22, diff.getNumDiffs());
+        left.setMode(KubernetesDeploymentMode.STANDALONE);
+        left.getTaskManager().setReplicas(2);
+        left.getTaskManager().getResource().setMemory("1024");
+        right = SpecUtils.clone(left);
+        right.getTaskManager().setReplicas(3);
+        diff = new ReflectiveDiffBuilder<>(left, right).build();
+        assertEquals(DiffType.SCALE, diff.getType());
+        assertEquals(1, diff.getNumDiffs());
+        right.getTaskManager().getResource().setMemory("2048");
+        right.getTaskManager().setReplicas(4);
+        diff = new ReflectiveDiffBuilder<>(left, right).build();
+        assertEquals(DiffType.UPGRADE, diff.getType());
+        assertEquals(2, diff.getNumDiffs());
     }
 
     @Test
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
index b406220f..716665ed 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
@@ -108,26 +108,28 @@ public class StandaloneFlinkServiceTest {
     }
 
     @Test
-    public void testReactiveScale() throws Exception {
+    public void testTMReplicaScaleApplication() throws Exception {
         var flinkDeployment = TestUtils.buildApplicationCluster();
         var clusterId = flinkDeployment.getMetadata().getName();
         var namespace = flinkDeployment.getMetadata().getNamespace();
         flinkDeployment.getSpec().setMode(KubernetesDeploymentMode.STANDALONE);
+
+        // Add parallelism change, verify it is honoured in reactive mode
         flinkDeployment
                 .getSpec()
                 .getFlinkConfiguration()
                 .put(
                         JobManagerOptions.SCHEDULER_MODE.key(),
                         SchedulerExecutionMode.REACTIVE.name());
+        flinkDeployment.getSpec().getJob().setParallelism(4);
         createDeployments(flinkDeployment);
         assertTrue(
                 flinkStandaloneService.scale(
                         flinkDeployment.getMetadata(),
                         flinkDeployment.getSpec().getJob(),
                         buildConfig(flinkDeployment, configuration)));
-
         assertEquals(
-                1,
+                2,
                 kubernetesClient
                         .apps()
                         .deployments()
@@ -137,7 +139,9 @@ public class StandaloneFlinkServiceTest {
                         .getSpec()
                         .getReplicas());
 
-        flinkDeployment.getSpec().getJob().setParallelism(4);
+        // Add parallelism and replica change, verify if replica change is 
honoured in reactive mode
+        flinkDeployment.getSpec().getJob().setParallelism(100);
+        flinkDeployment.getSpec().getTaskManager().setReplicas(2);
         assertTrue(
                 flinkStandaloneService.scale(
                         flinkDeployment.getMetadata(),
@@ -154,18 +158,35 @@ public class StandaloneFlinkServiceTest {
                         .getSpec()
                         .getReplicas());
 
-        kubernetesClient
-                .apps()
-                .deployments()
-                .inNamespace(namespace)
-                
.withName(StandaloneKubernetesUtils.getTaskManagerDeploymentName(clusterId))
-                .delete();
+        // Verify that any change in parallelism doesnt scale the cluster 
without reactive mode
+        flinkDeployment.getSpec().getJob().setParallelism(100);
+        flinkDeployment
+                .getSpec()
+                .getFlinkConfiguration()
+                .remove(JobManagerOptions.SCHEDULER_MODE.key());
+        assertFalse(
+                flinkStandaloneService.scale(
+                        flinkDeployment.getMetadata(),
+                        flinkDeployment.getSpec().getJob(),
+                        buildConfig(flinkDeployment, configuration)));
+
+        // Add replicas and verify that the scaling is not honoured as 
reactive mode not enabled
+        flinkDeployment.getSpec().getTaskManager().setReplicas(10);
         assertFalse(
                 flinkStandaloneService.scale(
                         flinkDeployment.getMetadata(),
                         flinkDeployment.getSpec().getJob(),
                         buildConfig(flinkDeployment, configuration)));
+    }
 
+    @Test
+    public void testTMReplicaScaleSession() throws Exception {
+        var flinkDeployment = TestUtils.buildSessionCluster();
+        var clusterId = flinkDeployment.getMetadata().getName();
+        var namespace = flinkDeployment.getMetadata().getNamespace();
+        flinkDeployment.getSpec().setMode(KubernetesDeploymentMode.STANDALONE);
+        // Add replicas
+        flinkDeployment.getSpec().getTaskManager().setReplicas(3);
         createDeployments(flinkDeployment);
         assertTrue(
                 flinkStandaloneService.scale(
@@ -173,15 +194,36 @@ public class StandaloneFlinkServiceTest {
                         flinkDeployment.getSpec().getJob(),
                         buildConfig(flinkDeployment, configuration)));
 
-        flinkDeployment
-                .getSpec()
-                .getFlinkConfiguration()
-                .remove(JobManagerOptions.SCHEDULER_MODE.key());
-        assertFalse(
+        assertEquals(
+                3,
+                kubernetesClient
+                        .apps()
+                        .deployments()
+                        .inNamespace(namespace)
+                        
.withName(StandaloneKubernetesUtils.getTaskManagerDeploymentName(clusterId))
+                        .get()
+                        .getSpec()
+                        .getReplicas());
+
+        // Scale the replica count of the task managers
+        flinkDeployment.getSpec().getTaskManager().setReplicas(10);
+        createDeployments(flinkDeployment);
+        assertTrue(
                 flinkStandaloneService.scale(
                         flinkDeployment.getMetadata(),
                         flinkDeployment.getSpec().getJob(),
                         buildConfig(flinkDeployment, configuration)));
+
+        assertEquals(
+                10,
+                kubernetesClient
+                        .apps()
+                        .deployments()
+                        .inNamespace(namespace)
+                        
.withName(StandaloneKubernetesUtils.getTaskManagerDeploymentName(clusterId))
+                        .get()
+                        .getSpec()
+                        .getReplicas());
     }
 
     @Test

Reply via email to