This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 66e14336 [FLINK-33535] Support autoscaler for session jobs
66e14336 is described below

commit 66e143362174491cef4b1d251b3fa21058fcf1c1
Author: Gyula Fora <[email protected]>
AuthorDate: Mon Nov 13 11:37:50 2023 +0100

    [FLINK-33535] Support autoscaler for session jobs
---
 docs/content/docs/custom-resource/autoscaler.md    |  28 ++--
 .../flink/kubernetes/operator/FlinkOperator.java   |   3 +-
 .../operator/observer/JobStatusObserver.java       |   9 ++
 .../sessionjob/FlinkSessionJobObserver.java        |  49 +------
 .../sessionjob/SessionJobReconciler.java           |  32 +++--
 .../operator/service/AbstractFlinkService.java     |   5 +-
 .../kubernetes/operator/service/FlinkService.java  |   1 +
 .../kubernetes/operator/utils/FlinkUtils.java      |  27 ----
 .../kubernetes/operator/TestingFlinkService.java   |   3 +-
 .../controller/FlinkSessionJobControllerTest.java  |   8 +-
 .../TestingFlinkSessionJobController.java          |   4 +-
 .../sessionjob/FlinkSessionJobObserverTest.java    | 146 +++------------------
 .../sessionjob/SessionJobReconcilerTest.java       |  43 +++++-
 .../operator/service/AbstractFlinkServiceTest.java |   3 +-
 .../kubernetes/operator/utils/FlinkUtilsTest.java  |   7 -
 15 files changed, 126 insertions(+), 242 deletions(-)

diff --git a/docs/content/docs/custom-resource/autoscaler.md 
b/docs/content/docs/custom-resource/autoscaler.md
index a73aecd9..0d759745 100644
--- a/docs/content/docs/custom-resource/autoscaler.md
+++ b/docs/content/docs/custom-resource/autoscaler.md
@@ -95,6 +95,8 @@ The autoscaler currently only works with [Flink 
1.17](https://hub.docker.com/_/f
    - [Fix logic for determining downstream subtasks for partitioner 
replacement](https://github.com/apache/flink/commit/fb482fe39844efda33a4c05858903f5b64e158a3)
  - [Support timespan for busyTime 
metrics](https://github.com/apache/flink/commit/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35)
 (good to have)
 
+For session job auto-scaling, a latest custom build of Flink 1.19 or 1.18 is 
required that contains the fix for 
[FLINK-33534](https://issues.apache.org/jira/browse/FLINK-33534).
+
 ### Limitations
 
 By default the autoscaler can work for all job vertices in the processing 
graph.
@@ -199,37 +201,37 @@ For a detailed config reference check the [general 
configuration page]({{< ref "
 
 ## Extensibility of Autoscaler
 
-The Autoscaler exposes a set of interfaces for storing autoscaler state, 
handling autoscaling events, 
-and executing scaling decisions. How these are implemented is specific to the 
orchestration framework 
-used (e.g. Kubernetes), but the interfaces are designed to be as generic as 
possible. The following 
+The Autoscaler exposes a set of interfaces for storing autoscaler state, 
handling autoscaling events,
+and executing scaling decisions. How these are implemented is specific to the 
orchestration framework
+used (e.g. Kubernetes), but the interfaces are designed to be as generic as 
possible. The following
 are the introduction of these generic interfaces:
 
 - **AutoScalerEventHandler** : Handling autoscaler events, such as: 
ScalingReport,
   AutoscalerError, etc. `LoggingEventHandler` is the default implementation, 
it logs events.
-- **AutoScalerStateStore** : Storing all state during scaling. 
`InMemoryAutoScalerStateStore` is 
-  the default implementation, it's based on the Java Heap, so the state will 
be discarded after 
+- **AutoScalerStateStore** : Storing all state during scaling. 
`InMemoryAutoScalerStateStore` is
+  the default implementation, it's based on the Java Heap, so the state will 
be discarded after
   process restarts. We will implement persistent State Store in the future, 
such as : `JdbcAutoScalerStateStore`.
 - **ScalingRealizer** : Applying scaling actions.
 - **JobAutoScalerContext** : Including all details related to the current job.
 
 ## Autoscaler Standalone
 
-**Flink Autoscaler Standalone** is an implementation of **Flink Autoscaler**, 
it runs as a separate java 
-process. It computes the reasonable parallelism of all job vertices by 
monitoring the metrics, such as: 
+**Flink Autoscaler Standalone** is an implementation of **Flink Autoscaler**, 
it runs as a separate java
+process. It computes the reasonable parallelism of all job vertices by 
monitoring the metrics, such as:
 processing rate, busy time, etc.
 
-Flink Autoscaler Standalone rescales flink job in-place by rest api of 
+Flink Autoscaler Standalone rescales flink job in-place by rest api of
 [Externalized Declarative Resource 
Management](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#externalized-declarative-resource-management).
-`RescaleApiScalingRealizer` is the default implementation of 
`ScalingRealizer`, it uses the Rescale API 
+`RescaleApiScalingRealizer` is the default implementation of 
`ScalingRealizer`, it uses the Rescale API
 to apply parallelism changes.
 
-Kubernetes Operator is well integrated with Autoscaler, we strongly recommend 
using Kubernetes Operator 
-directly for the kubernetes flink jobs, and only flink jobs in non-kubernetes 
environments use Autoscaler 
+Kubernetes Operator is well integrated with Autoscaler, we strongly recommend 
using Kubernetes Operator
+directly for the kubernetes flink jobs, and only flink jobs in non-kubernetes 
environments use Autoscaler
 Standalone.
 
 ### How To Use Autoscaler Standalone
 
-Currently, `Flink Autoscaler Standalone` only supports a single Flink cluster. 
It can be any type of 
+Currently, `Flink Autoscaler Standalone` only supports a single Flink cluster. 
It can be any type of
 Flink cluster, includes:
 
 - Flink Standalone Cluster
@@ -272,7 +274,7 @@ In general, the host and port are the same as Flink WebUI.
 
 ### Extensibility of autoscaler standalone
 
-Please click [here]({{< ref 
"docs/custom-resource/autoscaler#extensibility-of-autoscaler" >}}) 
+Please click [here]({{< ref 
"docs/custom-resource/autoscaler#extensibility-of-autoscaler" >}})
 to check out extensibility of generic autoscaler.
 
 `Autoscaler Standalone` isn't responsible for job management, so it doesn't 
have job information.
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 0735bf33..842eb978 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -189,7 +189,8 @@ public class FlinkOperator {
         var metricManager =
                 MetricManager.createFlinkSessionJobMetricManager(baseConfig, 
metricGroup);
         var statusRecorder = StatusRecorder.create(client, metricManager, 
listeners);
-        var reconciler = new SessionJobReconciler(eventRecorder, 
statusRecorder);
+        var autoscaler = AutoscalerFactory.create(client, eventRecorder);
+        var reconciler = new SessionJobReconciler(eventRecorder, 
statusRecorder, autoscaler);
         var observer = new FlinkSessionJobObserver(eventRecorder);
         var canaryResourceManager = new 
CanaryResourceManager<FlinkSessionJob>(configManager);
         
HealthProbe.INSTANCE.registerCanaryResourceManager(canaryResourceManager);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index e9a80368..d30cbfda 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.observer;
 
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.spec.JobState;
 import org.apache.flink.kubernetes.operator.api.status.JobStatus;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
@@ -55,6 +56,14 @@ public abstract class JobStatusObserver<R extends 
AbstractFlinkResource<?, ?>> {
      */
     public boolean observe(FlinkResourceContext<R> ctx) {
         var resource = ctx.getResource();
+        if (resource.getStatus()
+                        .getReconciliationStatus()
+                        .deserializeLastReconciledSpec()
+                        .getJob()
+                        .getState()
+                == JobState.SUSPENDED) {
+            return false;
+        }
         var jobStatus = resource.getStatus().getJobStatus();
         LOG.debug("Observing job status");
         var previousJobStatus = jobStatus.getState();
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.java
index d6d7f777..1846b156 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.kubernetes.operator.observer.sessionjob;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
 import org.apache.flink.kubernetes.operator.api.status.JobStatus;
@@ -35,14 +34,11 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
-import static 
org.apache.flink.kubernetes.operator.utils.FlinkUtils.generateSessionJobFixedJobID;
-
 /** The observer of {@link FlinkSessionJob}. */
 public class FlinkSessionJobObserver extends 
AbstractFlinkResourceObserver<FlinkSessionJob> {
 
@@ -74,57 +70,20 @@ public class FlinkSessionJobObserver extends 
AbstractFlinkResourceObserver<Flink
     @Override
     protected boolean 
checkIfAlreadyUpgraded(FlinkResourceContext<FlinkSessionJob> ctx) {
         var flinkSessionJob = ctx.getResource();
-        var uid = flinkSessionJob.getMetadata().getUid();
         Collection<JobStatusMessage> jobStatusMessages;
         try {
             jobStatusMessages = 
ctx.getFlinkService().listJobs(ctx.getObserveConfig());
         } catch (Exception e) {
             throw new RuntimeException("Failed to list jobs", e);
         }
-        var matchedJobs = new ArrayList<JobID>();
+        var submittedJobId = 
flinkSessionJob.getStatus().getJobStatus().getJobId();
         for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
-            var jobId = jobStatusMessage.getJobId();
-            if (jobId.getLowerPart()
-                            == generateSessionJobFixedJobID(uid, 
jobId.getUpperPart() + 1L)
-                                    .getLowerPart()
-                    && 
!jobStatusMessage.getJobState().isGloballyTerminalState()) {
-                matchedJobs.add(jobId);
-            }
-        }
-
-        if (matchedJobs.isEmpty()) {
-            return false;
-        } else if (matchedJobs.size() > 1) {
-            // this indicates a bug, which means we have more than one running 
job for a single
-            // SessionJob CR.
-            throw new RuntimeException(
-                    String.format(
-                            "Unexpected case: %d job found for the resource 
with uid: %s",
-                            matchedJobs.size(), 
flinkSessionJob.getMetadata().getUid()));
-        } else {
-            var matchedJobID = matchedJobs.get(0);
-            Long upgradeTargetGeneration =
-                    
ReconciliationUtils.getUpgradeTargetGeneration(flinkSessionJob);
-            long deployedGeneration = matchedJobID.getUpperPart();
-            var oldJobID = 
flinkSessionJob.getStatus().getJobStatus().getJobId();
-
-            if (upgradeTargetGeneration == deployedGeneration) {
-                LOG.info(
-                        "Pending upgrade is already deployed, updating status. 
Old jobID:{}, new jobID:{}",
-                        oldJobID,
-                        matchedJobID.toHexString());
-                
flinkSessionJob.getStatus().getJobStatus().setJobId(matchedJobID.toHexString());
+            if 
(jobStatusMessage.getJobId().toHexString().equals(submittedJobId)) {
+                LOG.info("Job with id {} is already deployed.", 
submittedJobId);
                 return true;
-            } else {
-                var msg =
-                        String.format(
-                                "Running job %s's generation %s doesn't match 
upgrade target generation %s.",
-                                matchedJobID.toHexString(),
-                                deployedGeneration,
-                                upgradeTargetGeneration);
-                throw new RuntimeException(msg);
             }
         }
+        return false;
     }
 
     private static class SessionJobStatusObserver extends 
JobStatusObserver<FlinkSessionJob> {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
index c7cac9df..60b6b26f 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
@@ -17,7 +17,8 @@
 
 package org.apache.flink.kubernetes.operator.reconciler.sessionjob;
 
-import org.apache.flink.autoscaler.NoopJobAutoscaler;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.JobAutoScaler;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
@@ -25,6 +26,7 @@ import 
org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
 import 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
+import 
org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
 import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 import 
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
@@ -34,6 +36,7 @@ import 
org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import 
org.apache.flink.runtime.messages.FlinkJobTerminatedWithoutCancellationException;
 
 import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,8 +51,9 @@ public class SessionJobReconciler
 
     public SessionJobReconciler(
             EventRecorder eventRecorder,
-            StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus> 
statusRecorder) {
-        super(eventRecorder, statusRecorder, new NoopJobAutoscaler<>());
+            StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus> 
statusRecorder,
+            JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext> 
autoscaler) {
+        super(eventRecorder, statusRecorder, autoscaler);
     }
 
     @Override
@@ -75,16 +79,21 @@ public class SessionJobReconciler
                 EventRecorder.Component.Job,
                 MSG_SUBMIT,
                 ctx.getKubernetesClient());
-        var jobID =
-                ctx.getFlinkService()
-                        .submitJobToSessionCluster(
-                                ctx.getResource().getMetadata(),
-                                sessionJobSpec,
-                                deployConfig,
-                                savepoint.orElse(null));
+
+        // Generate job id and record in status for durability
+        var jobId = JobID.generate();
+        
ctx.getResource().getStatus().getJobStatus().setJobId(jobId.toHexString());
+        statusRecorder.patchAndCacheStatus(ctx.getResource(), 
ctx.getKubernetesClient());
+
+        ctx.getFlinkService()
+                .submitJobToSessionCluster(
+                        ctx.getResource().getMetadata(),
+                        sessionJobSpec,
+                        jobId,
+                        deployConfig,
+                        savepoint.orElse(null));
 
         var status = ctx.getResource().getStatus();
-        status.getJobStatus().setJobId(jobID.toHexString());
         
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
     }
 
@@ -93,6 +102,7 @@ public class SessionJobReconciler
             throws Exception {
         ctx.getFlinkService()
                 .cancelSessionJob(ctx.getResource(), upgradeMode, 
ctx.getObserveConfig());
+        ctx.getResource().getStatus().getJobStatus().setJobId(null);
     }
 
     @Override
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 9f8bff3d..611ab152 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -223,11 +223,10 @@ public abstract class AbstractFlinkService implements 
FlinkService {
     public JobID submitJobToSessionCluster(
             ObjectMeta meta,
             FlinkSessionJobSpec spec,
+            JobID jobID,
             Configuration conf,
             @Nullable String savepoint)
             throws Exception {
-        // we generate jobID in advance to help deduplicate job submission.
-        var jobID = FlinkUtils.generateSessionJobFixedJobID(meta);
         runJar(spec.getJob(), jobID, uploadJar(meta, spec, conf), conf, 
savepoint);
         LOG.info("Submitted job: {} to session cluster.", jobID);
         return jobID;
@@ -811,7 +810,7 @@ public abstract class AbstractFlinkService implements 
FlinkService {
                             
conf.get(FLINK_VERSION).isNewerVersionThan(FlinkVersion.v1_16)
                                     ? conf.toMap()
                                     : null);
-            LOG.info("Submitting job: {} to session cluster.", 
jobID.toHexString());
+            LOG.info("Submitting job: {} to session cluster.", jobID);
             clusterClient
                     .sendRequest(headers, parameters, runRequestBody)
                     .get(operatorConfig.getFlinkClientTimeout().toSeconds(), 
TimeUnit.SECONDS);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index d3d1b773..9fca52d0 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -64,6 +64,7 @@ public interface FlinkService {
     JobID submitJobToSessionCluster(
             ObjectMeta meta,
             FlinkSessionJobSpec spec,
+            JobID jobID,
             Configuration conf,
             @Nullable String savepoint)
             throws Exception;
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index d8643e2b..cd12f056 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.kubernetes.operator.utils;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -32,7 +31,6 @@ import org.apache.flink.kubernetes.utils.KubernetesUtils;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
-import org.apache.flink.util.Preconditions;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -43,7 +41,6 @@ import io.fabric8.kubernetes.api.model.ConfigMapList;
 import io.fabric8.kubernetes.api.model.Container;
 import io.fabric8.kubernetes.api.model.HTTPGetAction;
 import io.fabric8.kubernetes.api.model.IntOrString;
-import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.PodSpec;
 import io.fabric8.kubernetes.api.model.Probe;
@@ -61,7 +58,6 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Optional;
-import java.util.UUID;
 
 import static 
org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
 
@@ -396,29 +392,6 @@ public class FlinkUtils {
         conf.set(KubernetesConfigOptions.JOB_MANAGER_ANNOTATIONS, labels);
     }
 
-    /**
-     * The jobID's lower part is the resource uid, the higher part is the 
resource generation.
-     *
-     * @param meta the meta of the resource.
-     * @return the generated jobID.
-     */
-    public static JobID generateSessionJobFixedJobID(ObjectMeta meta) {
-        return generateSessionJobFixedJobID(meta.getUid(), 
meta.getGeneration());
-    }
-
-    /**
-     * The jobID's lower part is the resource uid, the higher part is the 
resource generation.
-     *
-     * @param uid the uid of the resource.
-     * @param generation the generation of the resource.
-     * @return the generated jobID.
-     */
-    public static JobID generateSessionJobFixedJobID(String uid, Long 
generation) {
-        return new JobID(
-                
UUID.fromString(Preconditions.checkNotNull(uid)).getMostSignificantBits(),
-                Preconditions.checkNotNull(generation));
-    }
-
     /**
      * Check if the jobmanager pod has never successfully started. This is an 
important check to
      * determine whether it is possible that the job has started and taken any 
checkpoints that we
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index fe61cc67..14fa2a9d 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -52,7 +52,6 @@ import 
org.apache.flink.kubernetes.operator.service.AbstractFlinkService;
 import org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper;
 import org.apache.flink.kubernetes.operator.service.NativeFlinkServiceTest;
 import 
org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
-import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
@@ -251,6 +250,7 @@ public class TestingFlinkService extends 
AbstractFlinkService {
     public JobID submitJobToSessionCluster(
             ObjectMeta meta,
             FlinkSessionJobSpec spec,
+            JobID jobID,
             Configuration conf,
             @Nullable String savepoint)
             throws Exception {
@@ -258,7 +258,6 @@ public class TestingFlinkService extends 
AbstractFlinkService {
         if (deployFailure) {
             throw new Exception("Deployment failure");
         }
-        JobID jobID = FlinkUtils.generateSessionJobFixedJobID(meta);
         JobStatusMessage jobStatusMessage =
                 new JobStatusMessage(
                         jobID,
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
index 0705f70f..5c620256 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
@@ -116,7 +116,7 @@ class FlinkSessionJobControllerTest {
         updateControl = testController.reconcile(sessionJob, context);
 
         assertEquals(JobStatus.RUNNING.name(), 
sessionJob.getStatus().getJobStatus().getState());
-        assertEquals(5, testController.getInternalStatusUpdateCount());
+        assertEquals(6, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
 
         FlinkSessionJobReconciliationStatus reconciliationStatus =
@@ -574,7 +574,7 @@ class FlinkSessionJobControllerTest {
         // Reconciling
         assertEquals(
                 JobStatus.RECONCILING.name(), 
sessionJob.getStatus().getJobStatus().getState());
-        assertEquals(3, testController.getInternalStatusUpdateCount());
+        assertEquals(4, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
         assertEquals(
                 Optional.of(
@@ -591,7 +591,7 @@ class FlinkSessionJobControllerTest {
         // Running
         updateControl = testController.reconcile(sessionJob, context);
         assertEquals(JobStatus.RUNNING.name(), 
sessionJob.getStatus().getJobStatus().getState());
-        assertEquals(4, testController.getInternalStatusUpdateCount());
+        assertEquals(5, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
         assertEquals(
                 Optional.of(
@@ -601,7 +601,7 @@ class FlinkSessionJobControllerTest {
         // Stable loop
         updateControl = testController.reconcile(sessionJob, context);
         assertEquals(JobStatus.RUNNING.name(), 
sessionJob.getStatus().getJobStatus().getState());
-        assertEquals(4, testController.getInternalStatusUpdateCount());
+        assertEquals(5, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
         assertEquals(
                 Optional.of(
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
index 8926492c..5afefd41 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.controller;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.autoscaler.NoopJobAutoscaler;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkResourceContextFactory;
@@ -90,7 +91,8 @@ public class TestingFlinkSessionJobController
                 new FlinkSessionJobController(
                         ValidatorUtils.discoverValidators(configManager),
                         ctxFactory,
-                        new SessionJobReconciler(eventRecorder, 
statusRecorder),
+                        new SessionJobReconciler(
+                                eventRecorder, statusRecorder, new 
NoopJobAutoscaler<>()),
                         new FlinkSessionJobObserver(eventRecorder),
                         statusRecorder,
                         eventRecorder,
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
index 6de56566..c0005127 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
@@ -19,6 +19,7 @@ package 
org.apache.flink.kubernetes.operator.observer.sessionjob;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.NoopJobAutoscaler;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.RestOptions;
@@ -34,9 +35,7 @@ import 
org.apache.flink.kubernetes.operator.observer.TestObserverAdapter;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.reconciler.TestReconcilerAdapter;
 import 
org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler;
-import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.SnapshotUtils;
-import org.apache.flink.runtime.client.JobStatusMessage;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
@@ -44,6 +43,8 @@ import lombok.Getter;
 import org.apache.commons.lang3.StringUtils;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.Map;
 
@@ -66,7 +67,9 @@ public class FlinkSessionJobObserverTest extends 
OperatorTestBase {
         observer = new TestObserverAdapter<>(this, new 
FlinkSessionJobObserver(eventRecorder));
         reconciler =
                 new TestReconcilerAdapter<>(
-                        this, new SessionJobReconciler(eventRecorder, 
statusRecorder));
+                        this,
+                        new SessionJobReconciler(
+                                eventRecorder, statusRecorder, new 
NoopJobAutoscaler<>()));
     }
 
     @Test
@@ -268,146 +271,37 @@ public class FlinkSessionJobObserverTest extends 
OperatorTestBase {
         
assertFalse(SnapshotUtils.checkpointInProgress(sessionJob.getStatus().getJobStatus()));
     }
 
-    @Test
-    public void testObserveAlreadySubmitted() {
-        final var sessionJob = TestUtils.buildSessionJob();
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testObserveAlreadySubmitted(boolean submitted) {
+        var sessionJob = TestUtils.buildSessionJob();
+        
sessionJob.getStatus().getJobStatus().setState(JobStatus.RECONCILING.name());
         sessionJob.getMetadata().setGeneration(10L);
-        final var readyContext = 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
+        var readyContext = 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
 
         flinkService.setSessionJobSubmittedCallback(
                 () -> {
                     throw new RuntimeException("Failed after submitted job");
                 });
-        // submit job
+        // submit job but fail during submission
         Assertions.assertThrows(
                 RuntimeException.class, () -> reconciler.reconcile(sessionJob, 
readyContext));
-        
Assertions.assertNotNull(sessionJob.getStatus().getReconciliationStatus());
-        Assertions.assertEquals(
-                ReconciliationState.UPGRADING,
-                sessionJob.getStatus().getReconciliationStatus().getState());
-        
Assertions.assertNull(sessionJob.getStatus().getJobStatus().getJobId());
-
-        observer.observe(sessionJob, readyContext);
-        Assertions.assertEquals(
-                ReconciliationState.DEPLOYED,
-                sessionJob.getStatus().getReconciliationStatus().getState());
-        var jobID = sessionJob.getStatus().getJobStatus().getJobId();
-        Assertions.assertNotNull(jobID);
-        Assertions.assertEquals(10, JobID.fromHexString(jobID).getUpperPart());
-        Assertions.assertEquals(
-                JobStatus.RUNNING.name(), 
sessionJob.getStatus().getJobStatus().getState());
-    }
-
-    @Test
-    public void testObserveAlreadyUpgraded() throws Exception {
-        final var sessionJob = TestUtils.buildSessionJob();
-        sessionJob.getMetadata().setGeneration(10L);
-        final var readyContext = 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
-
-        reconciler.reconcile(sessionJob, readyContext);
-        observer.observe(sessionJob, readyContext);
-        Assertions.assertEquals(
-                ReconciliationState.DEPLOYED,
-                sessionJob.getStatus().getReconciliationStatus().getState());
-        var jobID = sessionJob.getStatus().getJobStatus().getJobId();
-        Assertions.assertNotNull(jobID);
-        Assertions.assertEquals(10, JobID.fromHexString(jobID).getUpperPart());
-        Assertions.assertEquals(
-                JobStatus.RUNNING.name(), 
sessionJob.getStatus().getJobStatus().getState());
-
-        flinkService.setSessionJobSubmittedCallback(
-                () -> {
-                    throw new RuntimeException("Failed after submitted job");
-                });
-        sessionJob.getSpec().getJob().setParallelism(10);
-        sessionJob.getMetadata().setGeneration(11L);
-
-        // upgrade
-        Assertions.assertThrows(
-                RuntimeException.class,
-                () -> {
-                    // suspend
-                    reconciler.reconcile(sessionJob, readyContext);
-                    // upgrade
-                    reconciler.reconcile(sessionJob, readyContext);
-                });
-
         Assertions.assertEquals(
                 ReconciliationState.UPGRADING,
                 sessionJob.getStatus().getReconciliationStatus().getState());
-        // jobID not changed
-        Assertions.assertEquals(jobID, 
sessionJob.getStatus().getJobStatus().getJobId());
-
-        observer.observe(sessionJob, readyContext);
-
-        Assertions.assertEquals(
-                ReconciliationState.DEPLOYED,
-                sessionJob.getStatus().getReconciliationStatus().getState());
-        Assertions.assertEquals(
-                11L,
-                
JobID.fromHexString(sessionJob.getStatus().getJobStatus().getJobId())
-                        .getUpperPart());
-    }
 
-    @Test
-    public void testOrphanedJob() throws Exception {
-        final var sessionJob = TestUtils.buildSessionJob();
-        sessionJob.getMetadata().setGeneration(10L);
-        final var readyContext = 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
+        if (!submitted) {
+            // Pretend that job was never submitted
+            flinkService.clear();
+        }
 
-        reconciler.reconcile(sessionJob, readyContext);
         observer.observe(sessionJob, readyContext);
         Assertions.assertEquals(
-                ReconciliationState.DEPLOYED,
+                submitted ? ReconciliationState.DEPLOYED : 
ReconciliationState.UPGRADING,
                 sessionJob.getStatus().getReconciliationStatus().getState());
-        var jobID = sessionJob.getStatus().getJobStatus().getJobId();
-        Assertions.assertNotNull(jobID);
-        Assertions.assertEquals(10, JobID.fromHexString(jobID).getUpperPart());
-        Assertions.assertEquals(
-                JobStatus.RUNNING.name(), 
sessionJob.getStatus().getJobStatus().getState());
-
-        flinkService.setSessionJobSubmittedCallback(
-                () -> {
-                    throw new RuntimeException("Failed after submitted job");
-                });
-        sessionJob.getSpec().getJob().setParallelism(10);
-        sessionJob.getMetadata().setGeneration(11L);
-        // upgrade
-        Assertions.assertThrows(
-                RuntimeException.class,
-                () -> {
-                    // suspend
-                    reconciler.reconcile(sessionJob, readyContext);
-                    // upgrade
-                    reconciler.reconcile(sessionJob, readyContext);
-                });
-
         Assertions.assertEquals(
-                ReconciliationState.UPGRADING,
-                sessionJob.getStatus().getReconciliationStatus().getState());
-        // jobID not changed
-        Assertions.assertEquals(jobID, 
sessionJob.getStatus().getJobStatus().getJobId());
-
-        // mock a job with different id of the target CR occurs
-        var jobs = flinkService.listJobs();
-        for (var job : jobs) {
-            if (!job.f1.getJobState().isGloballyTerminalState()
-                    && !job.f1.getJobId().toHexString().equals(jobID)) {
-                job.f1 =
-                        new JobStatusMessage(
-                                FlinkUtils.generateSessionJobFixedJobID(
-                                        sessionJob.getMetadata().getUid(), 
-1L),
-                                job.f1.getJobName(),
-                                job.f1.getJobState(),
-                                job.f1.getStartTime());
-            }
-        }
-
-        var exception =
-                Assertions.assertThrows(
-                        RuntimeException.class, () -> 
observer.observe(sessionJob, readyContext));
-        Assertions.assertTrue(
-                exception.getMessage().contains("doesn't match upgrade target 
generation"));
+                submitted ? JobStatus.RUNNING.name() : 
JobStatus.RECONCILING.name(),
+                sessionJob.getStatus().getJobStatus().getState());
     }
 
     @Test
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
index ab518429..96a6e6cb 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
@@ -19,6 +19,7 @@ package 
org.apache.flink.kubernetes.operator.reconciler.sessionjob;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.autoscaler.NoopJobAutoscaler;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.OperatorTestBase;
 import org.apache.flink.kubernetes.operator.TestUtils;
@@ -28,6 +29,7 @@ import org.apache.flink.kubernetes.operator.api.spec.JobState;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
 import org.apache.flink.kubernetes.operator.api.status.JobStatus;
+import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
@@ -92,7 +94,9 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
         configManager = new FlinkConfigManager(configuration);
         reconciler =
                 new TestReconcilerAdapter<>(
-                        this, new SessionJobReconciler(eventRecorder, 
statusRecorder));
+                        this,
+                        new SessionJobReconciler(
+                                eventRecorder, statusRecorder, new 
NoopJobAutoscaler<>()));
     }
 
     @Test
@@ -790,4 +794,41 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
                 
spSessionJob.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
         assertEquals("FINISHED", 
spSessionJob.getStatus().getJobStatus().getState());
     }
+
+    @Test
+    public void testJobIdGeneration() throws Exception {
+        var sessionJob = TestUtils.buildSessionJob();
+        sessionJob.getMetadata().setGeneration(10L);
+        var readyContext = 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
+
+        reconciler.reconcile(sessionJob, readyContext);
+        Assertions.assertEquals(
+                ReconciliationState.DEPLOYED,
+                sessionJob.getStatus().getReconciliationStatus().getState());
+        var jobID = sessionJob.getStatus().getJobStatus().getJobId();
+        Assertions.assertEquals(
+                RECONCILING.name(), 
sessionJob.getStatus().getJobStatus().getState());
+        Assertions.assertEquals(jobID, 
flinkService.listJobs().get(0).f1.getJobId().toString());
+
+        flinkService.setSessionJobSubmittedCallback(
+                () -> {
+                    throw new RuntimeException("Failed after submitted job");
+                });
+        sessionJob.getSpec().getJob().setParallelism(10);
+        // upgrade
+        Assertions.assertThrows(
+                RuntimeException.class,
+                () -> {
+                    // suspend
+                    reconciler.reconcile(sessionJob, readyContext);
+                    // upgrade
+                    reconciler.reconcile(sessionJob, readyContext);
+                });
+
+        Assertions.assertEquals(
+                ReconciliationState.UPGRADING,
+                sessionJob.getStatus().getReconciliationStatus().getState());
+        // New jobID recorded despite failure
+        Assertions.assertNotEquals(jobID, 
sessionJob.getStatus().getJobStatus().getJobId());
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
index 81f7c67c..3739298f 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
@@ -198,7 +198,8 @@ public class AbstractFlinkServiceTest {
 
         var job = TestUtils.buildSessionJob();
         var deployConf = configManager.getSessionJobConfig(session, 
job.getSpec());
-        flinkService.submitJobToSessionCluster(job.getMetadata(), 
job.getSpec(), deployConf, null);
+        flinkService.submitJobToSessionCluster(
+                job.getMetadata(), job.getSpec(), JobID.generate(), 
deployConf, null);
 
         // Make sure that deploy conf was passed to jar run
         if (flinkVersion.isNewerVersionThan(FlinkVersion.v1_16)) {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
index 912fef5f..23e2204e 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
@@ -355,13 +355,6 @@ public class FlinkUtilsTest {
         assertEquals(List.of(v1merged, volume2, volume3), 
mergedPod.getSpec().getVolumes());
     }
 
-    @Test
-    public void testJobIDGeneration() {
-        JobID jobID =
-                
FlinkUtils.generateSessionJobFixedJobID("ffffffff-ffff-ffff-aaaa-aaaaaaaaaaaa", 
2L);
-        assertEquals("ffffffffffffffff0000000000000002", jobID.toString());
-    }
-
     private void createHAConfigMapWithData(
             String configMapName, String namespace, String clusterId, 
Map<String, String> data) {
         final ConfigMap kubernetesConfigMap =


Reply via email to