gyfora commented on code in PR #614:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/614#discussion_r1231034719
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java:
##########
@@ -233,42 +233,41 @@ private void onMissingDeployment(FlinkDeployment
deployment) {
}
@Override
- protected void updateStatusToDeployedIfAlreadyUpgraded(
- FlinkResourceContext<FlinkDeployment> ctx) {
+ protected boolean
checkIfAlreadyUpgraded(FlinkResourceContext<FlinkDeployment> ctx) {
var flinkDep = ctx.getResource();
var status = flinkDep.getStatus();
+
+ // We are performing a full upgrade
Optional<Deployment> depOpt =
ctx.getJosdkContext().getSecondaryResource(Deployment.class);
- depOpt.ifPresent(
- deployment -> {
- Map<String, String> annotations =
deployment.getMetadata().getAnnotations();
- if (annotations == null) {
- return;
- }
- Long deployedGeneration =
-
Optional.ofNullable(annotations.get(FlinkUtils.CR_GENERATION_LABEL))
- .map(Long::valueOf)
- .orElse(-1L);
-
- Long upgradeTargetGeneration =
-
ReconciliationUtils.getUpgradeTargetGeneration(flinkDep);
-
- if (deployedGeneration.equals(upgradeTargetGeneration)) {
- logger.info("Pending upgrade is already deployed,
updating status.");
-
ReconciliationUtils.updateStatusForAlreadyUpgraded(flinkDep);
- if (flinkDep.getSpec().getJob() != null) {
- status.getJobStatus()
- .setState(
-
org.apache.flink.api.common.JobStatus.RECONCILING
- .name());
- }
-
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
- } else {
- logger.warn(
- "Running deployment generation {} doesn't
match upgrade target generation {}.",
- deployedGeneration,
- upgradeTargetGeneration);
- }
- });
+
+ if (!depOpt.isPresent()) {
+ return false;
Review Comment:
done
##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java:
##########
@@ -547,14 +553,30 @@ public Map<String, String> getClusterInfo(Configuration
conf) {
}
@Override
- public boolean scale(ObjectMeta meta, JobSpec jobSpec, Configuration conf)
{
- if (conf.get(JobManagerOptions.SCHEDULER_MODE) !=
SchedulerExecutionMode.REACTIVE
- && jobSpec != null) {
+ public boolean scale(FlinkResourceContext<?> ctx) {
+ boolean standalone = ctx.getDeploymentMode() ==
KubernetesDeploymentMode.STANDALONE;
+ boolean session = ctx.getResource().getSpec().getJob() == null;
+ if (!standalone) {
return false;
}
- desiredReplicas =
-
conf.get(StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS);
- return true;
+
+ if (session
+ || ctx.getObserveConfig().get(JobManagerOptions.SCHEDULER_MODE)
+ == SchedulerExecutionMode.REACTIVE) {
+ desiredReplicas =
+ ctx.getDeployConfig(ctx.getResource().getSpec())
+ .get(
+ StandaloneKubernetesConfigOptionsInternal
+ .KUBERNETES_TASKMANAGER_REPLICAS);
+ return true;
+ }
+
+ return false;
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]