Dennis-Mircea commented on code in PR #1146:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/1146#discussion_r3511079146


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java:
##########
@@ -87,6 +88,14 @@ private void 
observeClusterInfo(FlinkResourceContext<FlinkDeployment> ctx) {
                                     
flinkApp.getStatus().getJobStatus().getJobId());
             flinkApp.getStatus().getClusterInfo().putAll(clusterInfo);
             logger.debug("ClusterInfo: {}", 
flinkApp.getStatus().getClusterInfo());
+
+            // Report the actual number of TaskManagers registered with the 
running cluster rather
+            // than a value derived from the spec, so the status reflects 
dynamically changing
+            // deployments (e.g. standalone reactive mode). See FLINK-37448.
+            var replicas = 
ctx.getFlinkService().getTaskManagerReplicas(ctx.getObserveConfig());

Review Comment:
   By adding this implementation here, it will make a subsequent unnecessary 
REST call, as the `ctx.getFlinkService().getClusterInfo(...)` already does on 
REST call.. This should be integrated as part of `getClusterInfo(...)` method.



##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java:
##########
@@ -317,9 +317,11 @@ public void 
verifyUpgradeFromSavepointLegacyMode(FlinkVersion flinkVersion) thro
         var jobs = flinkService.listJobs();
         assertEquals(1, jobs.size());
         assertEquals("s0", jobs.get(0).f0);
+        // The actual replica count is populated during observation from the 
running cluster; right
+        // after reconciliation (cluster not yet observed) it stays 0 
(FLINK-37448).

Review Comment:
   No need for this comment.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java:
##########
@@ -87,6 +88,14 @@ private void 
observeClusterInfo(FlinkResourceContext<FlinkDeployment> ctx) {
                                     
flinkApp.getStatus().getJobStatus().getJobId());
             flinkApp.getStatus().getClusterInfo().putAll(clusterInfo);
             logger.debug("ClusterInfo: {}", 
flinkApp.getStatus().getClusterInfo());
+
+            // Report the actual number of TaskManagers registered with the 
running cluster rather
+            // than a value derived from the spec, so the status reflects 
dynamically changing
+            // deployments (e.g. standalone reactive mode). See FLINK-37448.

Review Comment:
   Too much unnecessary comments verbosity. This comment can be removed 
entirely.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java:
##########
@@ -239,11 +237,12 @@ private static void updateLastReconciledJobSpec(
         }
     }
 
-    private static TaskManagerInfo getTaskManagerInfo(
-            String name, Configuration conf, JobState jobState) {
-        var labelSelector = "component=taskmanager,app=" + name;
+    private static TaskManagerInfo getTaskManagerInfo(String name, JobState 
jobState) {
         if (jobState == JobState.RUNNING) {
-            return new TaskManagerInfo(labelSelector, 
FlinkUtils.getNumTaskManagers(conf));
+            // The actual replica count is populated during observation from 
the running cluster
+            // (see AbstractFlinkDeploymentObserver#observeClusterInfo). Here 
we only set the label
+            // selector; the replica count stays 0 until the cluster is 
observed.

Review Comment:
   Unnecessary comment verbosity.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java:
##########
@@ -239,11 +237,12 @@ private static void updateLastReconciledJobSpec(
         }
     }
 
-    private static TaskManagerInfo getTaskManagerInfo(
-            String name, Configuration conf, JobState jobState) {
-        var labelSelector = "component=taskmanager,app=" + name;
+    private static TaskManagerInfo getTaskManagerInfo(String name, JobState 
jobState) {
         if (jobState == JobState.RUNNING) {
-            return new TaskManagerInfo(labelSelector, 
FlinkUtils.getNumTaskManagers(conf));
+            // The actual replica count is populated during observation from 
the running cluster
+            // (see AbstractFlinkDeploymentObserver#observeClusterInfo). Here 
we only set the label
+            // selector; the replica count stays 0 until the cluster is 
observed.
+            return new 
TaskManagerInfo(FlinkUtils.getTaskManagerLabelSelector(name), 0);

Review Comment:
   This now resets replicas to 0 when RUNNING, and it stays 0 until the next 
observe populates it. In steady state this is fine, but right after every 
upgrade/spec change the scale subresource reports 0 until the next observe 
loop. This doesn't look ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to