gaborgsomogyi commented on code in PR #614:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/614#discussion_r1230837995
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java:
##########
@@ -132,4 +165,153 @@ protected void deleteClusterInternal(
deleteHAData(namespace, clusterId, conf);
}
}
+
+ @Override
+ public boolean scale(FlinkResourceContext<?> ctx) throws Exception {
+ var resource = ctx.getResource();
+ var spec = resource.getSpec();
+
+ var observeConfig = ctx.getObserveConfig();
+
+ if (spec.getJob() == null
+ || !observeConfig.get(
+
KubernetesOperatorConfigOptions.JOB_UPGRADE_INPLACE_SCALING_ENABLED)) {
+ return false;
+ }
+
+ if
(!observeConfig.get(FLINK_VERSION).isNewerVersionThan(FlinkVersion.v1_17)) {
+ LOG.debug("In-place rescaling is only available starting from
Flink 1.18");
+ return false;
+ }
+
+ if (!observeConfig
+ .get(JobManagerOptions.SCHEDULER)
+ .equals(JobManagerOptions.SchedulerType.Adaptive)) {
+ LOG.debug("In-place rescaling is only available with the adaptive
scheduler");
+ return false;
+ }
+
+ var status = resource.getStatus();
+ if (ReconciliationUtils.isJobInTerminalState(status)
+ ||
JobStatus.RECONCILING.name().equals(status.getJobStatus().getState())) {
+ LOG.info("Job in terminal or reconciling state cannot be scaled
in-place");
Review Comment:
+1 to have either such log or comment. One can follow the logic better w/
such explanations.
##########
examples/autoscaling/Dockerfile:
##########
@@ -16,5 +16,5 @@
# limitations under the License.
################################################################################
-FROM flink:1.17
+FROM ghcr.io/apache/flink-docker:1.18-SNAPSHOT-scala_2.12-java11-debian
Review Comment:
I know there is no 1.18 release yet but using snapshot can make tests fail
randomly.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java:
##########
@@ -233,42 +233,41 @@ private void onMissingDeployment(FlinkDeployment
deployment) {
}
@Override
- protected void updateStatusToDeployedIfAlreadyUpgraded(
- FlinkResourceContext<FlinkDeployment> ctx) {
+ protected boolean
checkIfAlreadyUpgraded(FlinkResourceContext<FlinkDeployment> ctx) {
var flinkDep = ctx.getResource();
var status = flinkDep.getStatus();
+
+ // We are performing a full upgrade
Optional<Deployment> depOpt =
ctx.getJosdkContext().getSecondaryResource(Deployment.class);
- depOpt.ifPresent(
- deployment -> {
- Map<String, String> annotations =
deployment.getMetadata().getAnnotations();
- if (annotations == null) {
- return;
- }
- Long deployedGeneration =
-
Optional.ofNullable(annotations.get(FlinkUtils.CR_GENERATION_LABEL))
- .map(Long::valueOf)
- .orElse(-1L);
-
- Long upgradeTargetGeneration =
-
ReconciliationUtils.getUpgradeTargetGeneration(flinkDep);
-
- if (deployedGeneration.equals(upgradeTargetGeneration)) {
- logger.info("Pending upgrade is already deployed,
updating status.");
-
ReconciliationUtils.updateStatusForAlreadyUpgraded(flinkDep);
- if (flinkDep.getSpec().getJob() != null) {
- status.getJobStatus()
- .setState(
-
org.apache.flink.api.common.JobStatus.RECONCILING
- .name());
- }
-
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
- } else {
- logger.warn(
- "Running deployment generation {} doesn't
match upgrade target generation {}.",
- deployedGeneration,
- upgradeTargetGeneration);
- }
- });
+
+ if (!depOpt.isPresent()) {
+ return false;
Review Comment:
Other places of `return false/true` there is an explanation what happens.
Maybe we can add something to all places. This stands not just here but all
boolean exits.
##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java:
##########
@@ -547,14 +553,30 @@ public Map<String, String> getClusterInfo(Configuration
conf) {
}
@Override
- public boolean scale(ObjectMeta meta, JobSpec jobSpec, Configuration conf)
{
- if (conf.get(JobManagerOptions.SCHEDULER_MODE) !=
SchedulerExecutionMode.REACTIVE
- && jobSpec != null) {
+ public boolean scale(FlinkResourceContext<?> ctx) {
+ boolean standalone = ctx.getDeploymentMode() ==
KubernetesDeploymentMode.STANDALONE;
+ boolean session = ctx.getResource().getSpec().getJob() == null;
+ if (!standalone) {
return false;
}
- desiredReplicas =
-
conf.get(StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS);
- return true;
+
+ if (session
+ || ctx.getObserveConfig().get(JobManagerOptions.SCHEDULER_MODE)
+ == SchedulerExecutionMode.REACTIVE) {
+ desiredReplicas =
+ ctx.getDeployConfig(ctx.getResource().getSpec())
+ .get(
+ StandaloneKubernetesConfigOptionsInternal
+ .KUBERNETES_TASKMANAGER_REPLICAS);
+ return true;
+ }
+
+ return false;
Review Comment:
Unless we want to add comments/logs it's better to have a single if
statement for easier understanding.
```
if (ctx.getDeploymentMode() != KubernetesDeploymentMode.STANDALONE &&
(ctx.getResource().getSpec().getJob() == null ||
ctx.getObserveConfig().get(JobManagerOptions.SCHEDULER_MODE) ==
SchedulerExecutionMode.REACTIVE))
```
##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java:
##########
@@ -112,13 +119,33 @@ public TestingFlinkDeploymentController(
public UpdateControl<FlinkDeployment> reconcile(
FlinkDeployment flinkDeployment, Context<FlinkDeployment> context)
throws Exception {
FlinkDeployment cloned = ReconciliationUtils.clone(flinkDeployment);
+ updateGeneration(cloned);
statusUpdateCounter.setCurrent(flinkDeployment);
UpdateControl<FlinkDeployment> updateControl =
flinkDeploymentController.reconcile(cloned, context);
Assertions.assertTrue(updateControl.isNoUpdate());
return updateControl;
}
+ private void updateGeneration(FlinkDeployment resource) {
Review Comment:
Is there a diff compared to TestingFlinkSessionJobController.java or just
copy/paste?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]