Dennis-Mircea commented on code in PR #1146:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/1146#discussion_r3511079146
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java:
##########
@@ -87,6 +88,14 @@ private void
observeClusterInfo(FlinkResourceContext<FlinkDeployment> ctx) {
flinkApp.getStatus().getJobStatus().getJobId());
flinkApp.getStatus().getClusterInfo().putAll(clusterInfo);
logger.debug("ClusterInfo: {}",
flinkApp.getStatus().getClusterInfo());
+
+ // Report the actual number of TaskManagers registered with the
running cluster rather
+ // than a value derived from the spec, so the status reflects
dynamically changing
+ // deployments (e.g. standalone reactive mode). See FLINK-37448.
+ var replicas =
ctx.getFlinkService().getTaskManagerReplicas(ctx.getObserveConfig());
Review Comment:
By adding this implementation here, it will make a subsequent unnecessary
REST call, as the `ctx.getFlinkService().getClusterInfo(...)` already does on
REST call.. This should be integrated as part of `getClusterInfo(...)` method.
##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java:
##########
@@ -317,9 +317,11 @@ public void
verifyUpgradeFromSavepointLegacyMode(FlinkVersion flinkVersion) thro
var jobs = flinkService.listJobs();
assertEquals(1, jobs.size());
assertEquals("s0", jobs.get(0).f0);
+ // The actual replica count is populated during observation from the
running cluster; right
+ // after reconciliation (cluster not yet observed) it stays 0
(FLINK-37448).
Review Comment:
No need for this comment.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java:
##########
@@ -87,6 +88,14 @@ private void
observeClusterInfo(FlinkResourceContext<FlinkDeployment> ctx) {
flinkApp.getStatus().getJobStatus().getJobId());
flinkApp.getStatus().getClusterInfo().putAll(clusterInfo);
logger.debug("ClusterInfo: {}",
flinkApp.getStatus().getClusterInfo());
+
+ // Report the actual number of TaskManagers registered with the
running cluster rather
+ // than a value derived from the spec, so the status reflects
dynamically changing
+ // deployments (e.g. standalone reactive mode). See FLINK-37448.
Review Comment:
Too much unnecessary comments verbosity. This comment can be removed
entirely.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java:
##########
@@ -239,11 +237,12 @@ private static void updateLastReconciledJobSpec(
}
}
- private static TaskManagerInfo getTaskManagerInfo(
- String name, Configuration conf, JobState jobState) {
- var labelSelector = "component=taskmanager,app=" + name;
+ private static TaskManagerInfo getTaskManagerInfo(String name, JobState
jobState) {
if (jobState == JobState.RUNNING) {
- return new TaskManagerInfo(labelSelector,
FlinkUtils.getNumTaskManagers(conf));
+ // The actual replica count is populated during observation from
the running cluster
+ // (see AbstractFlinkDeploymentObserver#observeClusterInfo). Here
we only set the label
+ // selector; the replica count stays 0 until the cluster is
observed.
Review Comment:
Unnecessary comment verbosity.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java:
##########
@@ -239,11 +237,12 @@ private static void updateLastReconciledJobSpec(
}
}
- private static TaskManagerInfo getTaskManagerInfo(
- String name, Configuration conf, JobState jobState) {
- var labelSelector = "component=taskmanager,app=" + name;
+ private static TaskManagerInfo getTaskManagerInfo(String name, JobState
jobState) {
if (jobState == JobState.RUNNING) {
- return new TaskManagerInfo(labelSelector,
FlinkUtils.getNumTaskManagers(conf));
+ // The actual replica count is populated during observation from
the running cluster
+ // (see AbstractFlinkDeploymentObserver#observeClusterInfo). Here
we only set the label
+ // selector; the replica count stays 0 until the cluster is
observed.
+ return new
TaskManagerInfo(FlinkUtils.getTaskManagerLabelSelector(name), 0);
Review Comment:
This now resets replicas to 0 when RUNNING, and it stays 0 until the next
observe populates it. In steady state this is fine, but right after every
upgrade/spec change the scale subresource reports 0 until the next observe
loop. This doesn't look ok.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]