gaborgsomogyi commented on code in PR #614:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/614#discussion_r1230837995


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java:
##########
@@ -132,4 +165,153 @@ protected void deleteClusterInternal(
             deleteHAData(namespace, clusterId, conf);
         }
     }
+
+    @Override
+    public boolean scale(FlinkResourceContext<?> ctx) throws Exception {
+        var resource = ctx.getResource();
+        var spec = resource.getSpec();
+
+        var observeConfig = ctx.getObserveConfig();
+
+        if (spec.getJob() == null
+                || !observeConfig.get(
+                        
KubernetesOperatorConfigOptions.JOB_UPGRADE_INPLACE_SCALING_ENABLED)) {
+            return false;
+        }
+
+        if 
(!observeConfig.get(FLINK_VERSION).isNewerVersionThan(FlinkVersion.v1_17)) {
+            LOG.debug("In-place rescaling is only available starting from 
Flink 1.18");
+            return false;
+        }
+
+        if (!observeConfig
+                .get(JobManagerOptions.SCHEDULER)
+                .equals(JobManagerOptions.SchedulerType.Adaptive)) {
+            LOG.debug("In-place rescaling is only available with the adaptive 
scheduler");
+            return false;
+        }
+
+        var status = resource.getStatus();
+        if (ReconciliationUtils.isJobInTerminalState(status)
+                || 
JobStatus.RECONCILING.name().equals(status.getJobStatus().getState())) {
+            LOG.info("Job in terminal or reconciling state cannot be scaled 
in-place");

Review Comment:
   +1 to have either such log or comment. One can follow the logic better w/ 
such explanations.



##########
examples/autoscaling/Dockerfile:
##########
@@ -16,5 +16,5 @@
 # limitations under the License.
 
################################################################################
 
-FROM flink:1.17
+FROM ghcr.io/apache/flink-docker:1.18-SNAPSHOT-scala_2.12-java11-debian

Review Comment:
   I know there is no 1.18 release yet but using snapshot can make tests fail 
randomly.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java:
##########
@@ -233,42 +233,41 @@ private void onMissingDeployment(FlinkDeployment 
deployment) {
     }
 
     @Override
-    protected void updateStatusToDeployedIfAlreadyUpgraded(
-            FlinkResourceContext<FlinkDeployment> ctx) {
+    protected boolean 
checkIfAlreadyUpgraded(FlinkResourceContext<FlinkDeployment> ctx) {
         var flinkDep = ctx.getResource();
         var status = flinkDep.getStatus();
+
+        // We are performing a full upgrade
         Optional<Deployment> depOpt = 
ctx.getJosdkContext().getSecondaryResource(Deployment.class);
-        depOpt.ifPresent(
-                deployment -> {
-                    Map<String, String> annotations = 
deployment.getMetadata().getAnnotations();
-                    if (annotations == null) {
-                        return;
-                    }
-                    Long deployedGeneration =
-                            
Optional.ofNullable(annotations.get(FlinkUtils.CR_GENERATION_LABEL))
-                                    .map(Long::valueOf)
-                                    .orElse(-1L);
-
-                    Long upgradeTargetGeneration =
-                            
ReconciliationUtils.getUpgradeTargetGeneration(flinkDep);
-
-                    if (deployedGeneration.equals(upgradeTargetGeneration)) {
-                        logger.info("Pending upgrade is already deployed, 
updating status.");
-                        
ReconciliationUtils.updateStatusForAlreadyUpgraded(flinkDep);
-                        if (flinkDep.getSpec().getJob() != null) {
-                            status.getJobStatus()
-                                    .setState(
-                                            
org.apache.flink.api.common.JobStatus.RECONCILING
-                                                    .name());
-                        }
-                        
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
-                    } else {
-                        logger.warn(
-                                "Running deployment generation {} doesn't 
match upgrade target generation {}.",
-                                deployedGeneration,
-                                upgradeTargetGeneration);
-                    }
-                });
+
+        if (!depOpt.isPresent()) {
+            return false;

Review Comment:
   Other places of `return false/true` there is an explanation what happens. 
Maybe we can add something to all places. This stands not just here but all 
boolean exits.



##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java:
##########
@@ -547,14 +553,30 @@ public Map<String, String> getClusterInfo(Configuration 
conf) {
     }
 
     @Override
-    public boolean scale(ObjectMeta meta, JobSpec jobSpec, Configuration conf) 
{
-        if (conf.get(JobManagerOptions.SCHEDULER_MODE) != 
SchedulerExecutionMode.REACTIVE
-                && jobSpec != null) {
+    public boolean scale(FlinkResourceContext<?> ctx) {
+        boolean standalone = ctx.getDeploymentMode() == 
KubernetesDeploymentMode.STANDALONE;
+        boolean session = ctx.getResource().getSpec().getJob() == null;
+        if (!standalone) {
             return false;
         }
-        desiredReplicas =
-                
conf.get(StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS);
-        return true;
+
+        if (session
+                || ctx.getObserveConfig().get(JobManagerOptions.SCHEDULER_MODE)
+                        == SchedulerExecutionMode.REACTIVE) {
+            desiredReplicas =
+                    ctx.getDeployConfig(ctx.getResource().getSpec())
+                            .get(
+                                    StandaloneKubernetesConfigOptionsInternal
+                                            .KUBERNETES_TASKMANAGER_REPLICAS);
+            return true;
+        }
+
+        return false;

Review Comment:
   Unless we want to add comments/logs it's better to have a single if 
statement for easier understanding.
   ```
   if (ctx.getDeploymentMode() != KubernetesDeploymentMode.STANDALONE &&
       (ctx.getResource().getSpec().getJob() == null || 
ctx.getObserveConfig().get(JobManagerOptions.SCHEDULER_MODE) == 
SchedulerExecutionMode.REACTIVE))
   ```
   



##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java:
##########
@@ -112,13 +119,33 @@ public TestingFlinkDeploymentController(
     public UpdateControl<FlinkDeployment> reconcile(
             FlinkDeployment flinkDeployment, Context<FlinkDeployment> context) 
throws Exception {
         FlinkDeployment cloned = ReconciliationUtils.clone(flinkDeployment);
+        updateGeneration(cloned);
         statusUpdateCounter.setCurrent(flinkDeployment);
         UpdateControl<FlinkDeployment> updateControl =
                 flinkDeploymentController.reconcile(cloned, context);
         Assertions.assertTrue(updateControl.isNoUpdate());
         return updateControl;
     }
 
+    private void updateGeneration(FlinkDeployment resource) {

Review Comment:
   Is there a diff compared to TestingFlinkSessionJobController.java or just 
copy/paste?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to