gyfora commented on code in PR #614:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/614#discussion_r1231034719


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java:
##########
@@ -233,42 +233,41 @@ private void onMissingDeployment(FlinkDeployment 
deployment) {
     }
 
     @Override
-    protected void updateStatusToDeployedIfAlreadyUpgraded(
-            FlinkResourceContext<FlinkDeployment> ctx) {
+    protected boolean 
checkIfAlreadyUpgraded(FlinkResourceContext<FlinkDeployment> ctx) {
         var flinkDep = ctx.getResource();
         var status = flinkDep.getStatus();
+
+        // We are performing a full upgrade
         Optional<Deployment> depOpt = 
ctx.getJosdkContext().getSecondaryResource(Deployment.class);
-        depOpt.ifPresent(
-                deployment -> {
-                    Map<String, String> annotations = 
deployment.getMetadata().getAnnotations();
-                    if (annotations == null) {
-                        return;
-                    }
-                    Long deployedGeneration =
-                            
Optional.ofNullable(annotations.get(FlinkUtils.CR_GENERATION_LABEL))
-                                    .map(Long::valueOf)
-                                    .orElse(-1L);
-
-                    Long upgradeTargetGeneration =
-                            
ReconciliationUtils.getUpgradeTargetGeneration(flinkDep);
-
-                    if (deployedGeneration.equals(upgradeTargetGeneration)) {
-                        logger.info("Pending upgrade is already deployed, 
updating status.");
-                        
ReconciliationUtils.updateStatusForAlreadyUpgraded(flinkDep);
-                        if (flinkDep.getSpec().getJob() != null) {
-                            status.getJobStatus()
-                                    .setState(
-                                            
org.apache.flink.api.common.JobStatus.RECONCILING
-                                                    .name());
-                        }
-                        
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
-                    } else {
-                        logger.warn(
-                                "Running deployment generation {} doesn't 
match upgrade target generation {}.",
-                                deployedGeneration,
-                                upgradeTargetGeneration);
-                    }
-                });
+
+        if (!depOpt.isPresent()) {
+            return false;

Review Comment:
   done



##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java:
##########
@@ -547,14 +553,30 @@ public Map<String, String> getClusterInfo(Configuration 
conf) {
     }
 
     @Override
-    public boolean scale(ObjectMeta meta, JobSpec jobSpec, Configuration conf) 
{
-        if (conf.get(JobManagerOptions.SCHEDULER_MODE) != 
SchedulerExecutionMode.REACTIVE
-                && jobSpec != null) {
+    public boolean scale(FlinkResourceContext<?> ctx) {
+        boolean standalone = ctx.getDeploymentMode() == 
KubernetesDeploymentMode.STANDALONE;
+        boolean session = ctx.getResource().getSpec().getJob() == null;
+        if (!standalone) {
             return false;
         }
-        desiredReplicas =
-                
conf.get(StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS);
-        return true;
+
+        if (session
+                || ctx.getObserveConfig().get(JobManagerOptions.SCHEDULER_MODE)
+                        == SchedulerExecutionMode.REACTIVE) {
+            desiredReplicas =
+                    ctx.getDeployConfig(ctx.getResource().getSpec())
+                            .get(
+                                    StandaloneKubernetesConfigOptionsInternal
+                                            .KUBERNETES_TASKMANAGER_REPLICAS);
+            return true;
+        }
+
+        return false;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to