This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 66e14336 [FLINK-33535] Support autoscaler for session jobs
66e14336 is described below
commit 66e143362174491cef4b1d251b3fa21058fcf1c1
Author: Gyula Fora <[email protected]>
AuthorDate: Mon Nov 13 11:37:50 2023 +0100
[FLINK-33535] Support autoscaler for session jobs
---
docs/content/docs/custom-resource/autoscaler.md | 28 ++--
.../flink/kubernetes/operator/FlinkOperator.java | 3 +-
.../operator/observer/JobStatusObserver.java | 9 ++
.../sessionjob/FlinkSessionJobObserver.java | 49 +------
.../sessionjob/SessionJobReconciler.java | 32 +++--
.../operator/service/AbstractFlinkService.java | 5 +-
.../kubernetes/operator/service/FlinkService.java | 1 +
.../kubernetes/operator/utils/FlinkUtils.java | 27 ----
.../kubernetes/operator/TestingFlinkService.java | 3 +-
.../controller/FlinkSessionJobControllerTest.java | 8 +-
.../TestingFlinkSessionJobController.java | 4 +-
.../sessionjob/FlinkSessionJobObserverTest.java | 146 +++------------------
.../sessionjob/SessionJobReconcilerTest.java | 43 +++++-
.../operator/service/AbstractFlinkServiceTest.java | 3 +-
.../kubernetes/operator/utils/FlinkUtilsTest.java | 7 -
15 files changed, 126 insertions(+), 242 deletions(-)
diff --git a/docs/content/docs/custom-resource/autoscaler.md
b/docs/content/docs/custom-resource/autoscaler.md
index a73aecd9..0d759745 100644
--- a/docs/content/docs/custom-resource/autoscaler.md
+++ b/docs/content/docs/custom-resource/autoscaler.md
@@ -95,6 +95,8 @@ The autoscaler currently only works with [Flink
1.17](https://hub.docker.com/_/f
- [Fix logic for determining downstream subtasks for partitioner
replacement](https://github.com/apache/flink/commit/fb482fe39844efda33a4c05858903f5b64e158a3)
- [Support timespan for busyTime
metrics](https://github.com/apache/flink/commit/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35)
(good to have)
+For session job auto-scaling, a latest custom build of Flink 1.19 or 1.18 is
required that contains the fix for
[FLINK-33534](https://issues.apache.org/jira/browse/FLINK-33534).
+
### Limitations
By default the autoscaler can work for all job vertices in the processing
graph.
@@ -199,37 +201,37 @@ For a detailed config reference check the [general
configuration page]({{< ref "
## Extensibility of Autoscaler
-The Autoscaler exposes a set of interfaces for storing autoscaler state,
handling autoscaling events,
-and executing scaling decisions. How these are implemented is specific to the
orchestration framework
-used (e.g. Kubernetes), but the interfaces are designed to be as generic as
possible. The following
+The Autoscaler exposes a set of interfaces for storing autoscaler state,
handling autoscaling events,
+and executing scaling decisions. How these are implemented is specific to the
orchestration framework
+used (e.g. Kubernetes), but the interfaces are designed to be as generic as
possible. The following
are the introduction of these generic interfaces:
- **AutoScalerEventHandler** : Handling autoscaler events, such as:
ScalingReport,
AutoscalerError, etc. `LoggingEventHandler` is the default implementation,
it logs events.
-- **AutoScalerStateStore** : Storing all state during scaling.
`InMemoryAutoScalerStateStore` is
- the default implementation, it's based on the Java Heap, so the state will
be discarded after
+- **AutoScalerStateStore** : Storing all state during scaling.
`InMemoryAutoScalerStateStore` is
+ the default implementation, it's based on the Java Heap, so the state will
be discarded after
process restarts. We will implement persistent State Store in the future,
such as : `JdbcAutoScalerStateStore`.
- **ScalingRealizer** : Applying scaling actions.
- **JobAutoScalerContext** : Including all details related to the current job.
## Autoscaler Standalone
-**Flink Autoscaler Standalone** is an implementation of **Flink Autoscaler**,
it runs as a separate java
-process. It computes the reasonable parallelism of all job vertices by
monitoring the metrics, such as:
+**Flink Autoscaler Standalone** is an implementation of **Flink Autoscaler**,
it runs as a separate java
+process. It computes the reasonable parallelism of all job vertices by
monitoring the metrics, such as:
processing rate, busy time, etc.
-Flink Autoscaler Standalone rescales flink job in-place by rest api of
+Flink Autoscaler Standalone rescales flink job in-place by rest api of
[Externalized Declarative Resource
Management](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#externalized-declarative-resource-management).
-`RescaleApiScalingRealizer` is the default implementation of
`ScalingRealizer`, it uses the Rescale API
+`RescaleApiScalingRealizer` is the default implementation of
`ScalingRealizer`, it uses the Rescale API
to apply parallelism changes.
-Kubernetes Operator is well integrated with Autoscaler, we strongly recommend
using Kubernetes Operator
-directly for the kubernetes flink jobs, and only flink jobs in non-kubernetes
environments use Autoscaler
+Kubernetes Operator is well integrated with Autoscaler, we strongly recommend
using Kubernetes Operator
+directly for the kubernetes flink jobs, and only flink jobs in non-kubernetes
environments use Autoscaler
Standalone.
### How To Use Autoscaler Standalone
-Currently, `Flink Autoscaler Standalone` only supports a single Flink cluster.
It can be any type of
+Currently, `Flink Autoscaler Standalone` only supports a single Flink cluster.
It can be any type of
Flink cluster, includes:
- Flink Standalone Cluster
@@ -272,7 +274,7 @@ In general, the host and port are the same as Flink WebUI.
### Extensibility of autoscaler standalone
-Please click [here]({{< ref
"docs/custom-resource/autoscaler#extensibility-of-autoscaler" >}})
+Please click [here]({{< ref
"docs/custom-resource/autoscaler#extensibility-of-autoscaler" >}})
to check out extensibility of generic autoscaler.
`Autoscaler Standalone` isn't responsible for job management, so it doesn't
have job information.
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 0735bf33..842eb978 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -189,7 +189,8 @@ public class FlinkOperator {
var metricManager =
MetricManager.createFlinkSessionJobMetricManager(baseConfig,
metricGroup);
var statusRecorder = StatusRecorder.create(client, metricManager,
listeners);
- var reconciler = new SessionJobReconciler(eventRecorder,
statusRecorder);
+ var autoscaler = AutoscalerFactory.create(client, eventRecorder);
+ var reconciler = new SessionJobReconciler(eventRecorder,
statusRecorder, autoscaler);
var observer = new FlinkSessionJobObserver(eventRecorder);
var canaryResourceManager = new
CanaryResourceManager<FlinkSessionJob>(configManager);
HealthProbe.INSTANCE.registerCanaryResourceManager(canaryResourceManager);
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index e9a80368..d30cbfda 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.operator.observer;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
@@ -55,6 +56,14 @@ public abstract class JobStatusObserver<R extends
AbstractFlinkResource<?, ?>> {
*/
public boolean observe(FlinkResourceContext<R> ctx) {
var resource = ctx.getResource();
+ if (resource.getStatus()
+ .getReconciliationStatus()
+ .deserializeLastReconciledSpec()
+ .getJob()
+ .getState()
+ == JobState.SUSPENDED) {
+ return false;
+ }
var jobStatus = resource.getStatus().getJobStatus();
LOG.debug("Observing job status");
var previousJobStatus = jobStatus.getState();
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.java
index d6d7f777..1846b156 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.java
@@ -17,7 +17,6 @@
package org.apache.flink.kubernetes.operator.observer.sessionjob;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
@@ -35,14 +34,11 @@ import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
-import static
org.apache.flink.kubernetes.operator.utils.FlinkUtils.generateSessionJobFixedJobID;
-
/** The observer of {@link FlinkSessionJob}. */
public class FlinkSessionJobObserver extends
AbstractFlinkResourceObserver<FlinkSessionJob> {
@@ -74,57 +70,20 @@ public class FlinkSessionJobObserver extends
AbstractFlinkResourceObserver<Flink
@Override
protected boolean
checkIfAlreadyUpgraded(FlinkResourceContext<FlinkSessionJob> ctx) {
var flinkSessionJob = ctx.getResource();
- var uid = flinkSessionJob.getMetadata().getUid();
Collection<JobStatusMessage> jobStatusMessages;
try {
jobStatusMessages =
ctx.getFlinkService().listJobs(ctx.getObserveConfig());
} catch (Exception e) {
throw new RuntimeException("Failed to list jobs", e);
}
- var matchedJobs = new ArrayList<JobID>();
+ var submittedJobId =
flinkSessionJob.getStatus().getJobStatus().getJobId();
for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
- var jobId = jobStatusMessage.getJobId();
- if (jobId.getLowerPart()
- == generateSessionJobFixedJobID(uid,
jobId.getUpperPart() + 1L)
- .getLowerPart()
- &&
!jobStatusMessage.getJobState().isGloballyTerminalState()) {
- matchedJobs.add(jobId);
- }
- }
-
- if (matchedJobs.isEmpty()) {
- return false;
- } else if (matchedJobs.size() > 1) {
- // this indicates a bug, which means we have more than one running
job for a single
- // SessionJob CR.
- throw new RuntimeException(
- String.format(
- "Unexpected case: %d job found for the resource
with uid: %s",
- matchedJobs.size(),
flinkSessionJob.getMetadata().getUid()));
- } else {
- var matchedJobID = matchedJobs.get(0);
- Long upgradeTargetGeneration =
-
ReconciliationUtils.getUpgradeTargetGeneration(flinkSessionJob);
- long deployedGeneration = matchedJobID.getUpperPart();
- var oldJobID =
flinkSessionJob.getStatus().getJobStatus().getJobId();
-
- if (upgradeTargetGeneration == deployedGeneration) {
- LOG.info(
- "Pending upgrade is already deployed, updating status.
Old jobID:{}, new jobID:{}",
- oldJobID,
- matchedJobID.toHexString());
-
flinkSessionJob.getStatus().getJobStatus().setJobId(matchedJobID.toHexString());
+ if
(jobStatusMessage.getJobId().toHexString().equals(submittedJobId)) {
+ LOG.info("Job with id {} is already deployed.",
submittedJobId);
return true;
- } else {
- var msg =
- String.format(
- "Running job %s's generation %s doesn't match
upgrade target generation %s.",
- matchedJobID.toHexString(),
- deployedGeneration,
- upgradeTargetGeneration);
- throw new RuntimeException(msg);
}
}
+ return false;
}
private static class SessionJobStatusObserver extends
JobStatusObserver<FlinkSessionJob> {
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
index c7cac9df..60b6b26f 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
@@ -17,7 +17,8 @@
package org.apache.flink.kubernetes.operator.reconciler.sessionjob;
-import org.apache.flink.autoscaler.NoopJobAutoscaler;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.JobAutoScaler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
@@ -25,6 +26,7 @@ import
org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
+import
org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
import
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
@@ -34,6 +36,7 @@ import
org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import
org.apache.flink.runtime.messages.FlinkJobTerminatedWithoutCancellationException;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,8 +51,9 @@ public class SessionJobReconciler
public SessionJobReconciler(
EventRecorder eventRecorder,
- StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus>
statusRecorder) {
- super(eventRecorder, statusRecorder, new NoopJobAutoscaler<>());
+ StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus>
statusRecorder,
+ JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext>
autoscaler) {
+ super(eventRecorder, statusRecorder, autoscaler);
}
@Override
@@ -75,16 +79,21 @@ public class SessionJobReconciler
EventRecorder.Component.Job,
MSG_SUBMIT,
ctx.getKubernetesClient());
- var jobID =
- ctx.getFlinkService()
- .submitJobToSessionCluster(
- ctx.getResource().getMetadata(),
- sessionJobSpec,
- deployConfig,
- savepoint.orElse(null));
+
+ // Generate job id and record in status for durability
+ var jobId = JobID.generate();
+
ctx.getResource().getStatus().getJobStatus().setJobId(jobId.toHexString());
+ statusRecorder.patchAndCacheStatus(ctx.getResource(),
ctx.getKubernetesClient());
+
+ ctx.getFlinkService()
+ .submitJobToSessionCluster(
+ ctx.getResource().getMetadata(),
+ sessionJobSpec,
+ jobId,
+ deployConfig,
+ savepoint.orElse(null));
var status = ctx.getResource().getStatus();
- status.getJobStatus().setJobId(jobID.toHexString());
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
}
@@ -93,6 +102,7 @@ public class SessionJobReconciler
throws Exception {
ctx.getFlinkService()
.cancelSessionJob(ctx.getResource(), upgradeMode,
ctx.getObserveConfig());
+ ctx.getResource().getStatus().getJobStatus().setJobId(null);
}
@Override
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 9f8bff3d..611ab152 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -223,11 +223,10 @@ public abstract class AbstractFlinkService implements
FlinkService {
public JobID submitJobToSessionCluster(
ObjectMeta meta,
FlinkSessionJobSpec spec,
+ JobID jobID,
Configuration conf,
@Nullable String savepoint)
throws Exception {
- // we generate jobID in advance to help deduplicate job submission.
- var jobID = FlinkUtils.generateSessionJobFixedJobID(meta);
runJar(spec.getJob(), jobID, uploadJar(meta, spec, conf), conf,
savepoint);
LOG.info("Submitted job: {} to session cluster.", jobID);
return jobID;
@@ -811,7 +810,7 @@ public abstract class AbstractFlinkService implements
FlinkService {
conf.get(FLINK_VERSION).isNewerVersionThan(FlinkVersion.v1_16)
? conf.toMap()
: null);
- LOG.info("Submitting job: {} to session cluster.",
jobID.toHexString());
+ LOG.info("Submitting job: {} to session cluster.", jobID);
clusterClient
.sendRequest(headers, parameters, runRequestBody)
.get(operatorConfig.getFlinkClientTimeout().toSeconds(),
TimeUnit.SECONDS);
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index d3d1b773..9fca52d0 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -64,6 +64,7 @@ public interface FlinkService {
JobID submitJobToSessionCluster(
ObjectMeta meta,
FlinkSessionJobSpec spec,
+ JobID jobID,
Configuration conf,
@Nullable String savepoint)
throws Exception;
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index d8643e2b..cd12f056 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -17,7 +17,6 @@
package org.apache.flink.kubernetes.operator.utils;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -32,7 +31,6 @@ import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
import org.apache.flink.runtime.util.ZooKeeperUtils;
-import org.apache.flink.util.Preconditions;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -43,7 +41,6 @@ import io.fabric8.kubernetes.api.model.ConfigMapList;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.HTTPGetAction;
import io.fabric8.kubernetes.api.model.IntOrString;
-import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodSpec;
import io.fabric8.kubernetes.api.model.Probe;
@@ -61,7 +58,6 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
-import java.util.UUID;
import static
org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
@@ -396,29 +392,6 @@ public class FlinkUtils {
conf.set(KubernetesConfigOptions.JOB_MANAGER_ANNOTATIONS, labels);
}
- /**
- * The jobID's lower part is the resource uid, the higher part is the
resource generation.
- *
- * @param meta the meta of the resource.
- * @return the generated jobID.
- */
- public static JobID generateSessionJobFixedJobID(ObjectMeta meta) {
- return generateSessionJobFixedJobID(meta.getUid(),
meta.getGeneration());
- }
-
- /**
- * The jobID's lower part is the resource uid, the higher part is the
resource generation.
- *
- * @param uid the uid of the resource.
- * @param generation the generation of the resource.
- * @return the generated jobID.
- */
- public static JobID generateSessionJobFixedJobID(String uid, Long
generation) {
- return new JobID(
-
UUID.fromString(Preconditions.checkNotNull(uid)).getMostSignificantBits(),
- Preconditions.checkNotNull(generation));
- }
-
/**
* Check if the jobmanager pod has never successfully started. This is an
important check to
* determine whether it is possible that the job has started and taken any
checkpoints that we
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index fe61cc67..14fa2a9d 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -52,7 +52,6 @@ import
org.apache.flink.kubernetes.operator.service.AbstractFlinkService;
import org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper;
import org.apache.flink.kubernetes.operator.service.NativeFlinkServiceTest;
import
org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
-import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
@@ -251,6 +250,7 @@ public class TestingFlinkService extends
AbstractFlinkService {
public JobID submitJobToSessionCluster(
ObjectMeta meta,
FlinkSessionJobSpec spec,
+ JobID jobID,
Configuration conf,
@Nullable String savepoint)
throws Exception {
@@ -258,7 +258,6 @@ public class TestingFlinkService extends
AbstractFlinkService {
if (deployFailure) {
throw new Exception("Deployment failure");
}
- JobID jobID = FlinkUtils.generateSessionJobFixedJobID(meta);
JobStatusMessage jobStatusMessage =
new JobStatusMessage(
jobID,
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
index 0705f70f..5c620256 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
@@ -116,7 +116,7 @@ class FlinkSessionJobControllerTest {
updateControl = testController.reconcile(sessionJob, context);
assertEquals(JobStatus.RUNNING.name(),
sessionJob.getStatus().getJobStatus().getState());
- assertEquals(5, testController.getInternalStatusUpdateCount());
+ assertEquals(6, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isUpdateStatus());
FlinkSessionJobReconciliationStatus reconciliationStatus =
@@ -574,7 +574,7 @@ class FlinkSessionJobControllerTest {
// Reconciling
assertEquals(
JobStatus.RECONCILING.name(),
sessionJob.getStatus().getJobStatus().getState());
- assertEquals(3, testController.getInternalStatusUpdateCount());
+ assertEquals(4, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isUpdateStatus());
assertEquals(
Optional.of(
@@ -591,7 +591,7 @@ class FlinkSessionJobControllerTest {
// Running
updateControl = testController.reconcile(sessionJob, context);
assertEquals(JobStatus.RUNNING.name(),
sessionJob.getStatus().getJobStatus().getState());
- assertEquals(4, testController.getInternalStatusUpdateCount());
+ assertEquals(5, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isUpdateStatus());
assertEquals(
Optional.of(
@@ -601,7 +601,7 @@ class FlinkSessionJobControllerTest {
// Stable loop
updateControl = testController.reconcile(sessionJob, context);
assertEquals(JobStatus.RUNNING.name(),
sessionJob.getStatus().getJobStatus().getState());
- assertEquals(4, testController.getInternalStatusUpdateCount());
+ assertEquals(5, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isUpdateStatus());
assertEquals(
Optional.of(
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
index 8926492c..5afefd41 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
@@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.operator.controller;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.autoscaler.NoopJobAutoscaler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkResourceContextFactory;
@@ -90,7 +91,8 @@ public class TestingFlinkSessionJobController
new FlinkSessionJobController(
ValidatorUtils.discoverValidators(configManager),
ctxFactory,
- new SessionJobReconciler(eventRecorder,
statusRecorder),
+ new SessionJobReconciler(
+ eventRecorder, statusRecorder, new
NoopJobAutoscaler<>()),
new FlinkSessionJobObserver(eventRecorder),
statusRecorder,
eventRecorder,
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
index 6de56566..c0005127 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
@@ -19,6 +19,7 @@ package
org.apache.flink.kubernetes.operator.observer.sessionjob;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.NoopJobAutoscaler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.RestOptions;
@@ -34,9 +35,7 @@ import
org.apache.flink.kubernetes.operator.observer.TestObserverAdapter;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.TestReconcilerAdapter;
import
org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler;
-import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.SnapshotUtils;
-import org.apache.flink.runtime.client.JobStatusMessage;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
@@ -44,6 +43,8 @@ import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.Map;
@@ -66,7 +67,9 @@ public class FlinkSessionJobObserverTest extends
OperatorTestBase {
observer = new TestObserverAdapter<>(this, new
FlinkSessionJobObserver(eventRecorder));
reconciler =
new TestReconcilerAdapter<>(
- this, new SessionJobReconciler(eventRecorder,
statusRecorder));
+ this,
+ new SessionJobReconciler(
+ eventRecorder, statusRecorder, new
NoopJobAutoscaler<>()));
}
@Test
@@ -268,146 +271,37 @@ public class FlinkSessionJobObserverTest extends
OperatorTestBase {
assertFalse(SnapshotUtils.checkpointInProgress(sessionJob.getStatus().getJobStatus()));
}
- @Test
- public void testObserveAlreadySubmitted() {
- final var sessionJob = TestUtils.buildSessionJob();
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testObserveAlreadySubmitted(boolean submitted) {
+ var sessionJob = TestUtils.buildSessionJob();
+
sessionJob.getStatus().getJobStatus().setState(JobStatus.RECONCILING.name());
sessionJob.getMetadata().setGeneration(10L);
- final var readyContext =
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
+ var readyContext =
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
flinkService.setSessionJobSubmittedCallback(
() -> {
throw new RuntimeException("Failed after submitted job");
});
- // submit job
+ // submit job but fail during submission
Assertions.assertThrows(
RuntimeException.class, () -> reconciler.reconcile(sessionJob,
readyContext));
-
Assertions.assertNotNull(sessionJob.getStatus().getReconciliationStatus());
- Assertions.assertEquals(
- ReconciliationState.UPGRADING,
- sessionJob.getStatus().getReconciliationStatus().getState());
-
Assertions.assertNull(sessionJob.getStatus().getJobStatus().getJobId());
-
- observer.observe(sessionJob, readyContext);
- Assertions.assertEquals(
- ReconciliationState.DEPLOYED,
- sessionJob.getStatus().getReconciliationStatus().getState());
- var jobID = sessionJob.getStatus().getJobStatus().getJobId();
- Assertions.assertNotNull(jobID);
- Assertions.assertEquals(10, JobID.fromHexString(jobID).getUpperPart());
- Assertions.assertEquals(
- JobStatus.RUNNING.name(),
sessionJob.getStatus().getJobStatus().getState());
- }
-
- @Test
- public void testObserveAlreadyUpgraded() throws Exception {
- final var sessionJob = TestUtils.buildSessionJob();
- sessionJob.getMetadata().setGeneration(10L);
- final var readyContext =
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
-
- reconciler.reconcile(sessionJob, readyContext);
- observer.observe(sessionJob, readyContext);
- Assertions.assertEquals(
- ReconciliationState.DEPLOYED,
- sessionJob.getStatus().getReconciliationStatus().getState());
- var jobID = sessionJob.getStatus().getJobStatus().getJobId();
- Assertions.assertNotNull(jobID);
- Assertions.assertEquals(10, JobID.fromHexString(jobID).getUpperPart());
- Assertions.assertEquals(
- JobStatus.RUNNING.name(),
sessionJob.getStatus().getJobStatus().getState());
-
- flinkService.setSessionJobSubmittedCallback(
- () -> {
- throw new RuntimeException("Failed after submitted job");
- });
- sessionJob.getSpec().getJob().setParallelism(10);
- sessionJob.getMetadata().setGeneration(11L);
-
- // upgrade
- Assertions.assertThrows(
- RuntimeException.class,
- () -> {
- // suspend
- reconciler.reconcile(sessionJob, readyContext);
- // upgrade
- reconciler.reconcile(sessionJob, readyContext);
- });
-
Assertions.assertEquals(
ReconciliationState.UPGRADING,
sessionJob.getStatus().getReconciliationStatus().getState());
- // jobID not changed
- Assertions.assertEquals(jobID,
sessionJob.getStatus().getJobStatus().getJobId());
-
- observer.observe(sessionJob, readyContext);
-
- Assertions.assertEquals(
- ReconciliationState.DEPLOYED,
- sessionJob.getStatus().getReconciliationStatus().getState());
- Assertions.assertEquals(
- 11L,
-
JobID.fromHexString(sessionJob.getStatus().getJobStatus().getJobId())
- .getUpperPart());
- }
- @Test
- public void testOrphanedJob() throws Exception {
- final var sessionJob = TestUtils.buildSessionJob();
- sessionJob.getMetadata().setGeneration(10L);
- final var readyContext =
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
+ if (!submitted) {
+ // Pretend that job was never submitted
+ flinkService.clear();
+ }
- reconciler.reconcile(sessionJob, readyContext);
observer.observe(sessionJob, readyContext);
Assertions.assertEquals(
- ReconciliationState.DEPLOYED,
+ submitted ? ReconciliationState.DEPLOYED :
ReconciliationState.UPGRADING,
sessionJob.getStatus().getReconciliationStatus().getState());
- var jobID = sessionJob.getStatus().getJobStatus().getJobId();
- Assertions.assertNotNull(jobID);
- Assertions.assertEquals(10, JobID.fromHexString(jobID).getUpperPart());
- Assertions.assertEquals(
- JobStatus.RUNNING.name(),
sessionJob.getStatus().getJobStatus().getState());
-
- flinkService.setSessionJobSubmittedCallback(
- () -> {
- throw new RuntimeException("Failed after submitted job");
- });
- sessionJob.getSpec().getJob().setParallelism(10);
- sessionJob.getMetadata().setGeneration(11L);
- // upgrade
- Assertions.assertThrows(
- RuntimeException.class,
- () -> {
- // suspend
- reconciler.reconcile(sessionJob, readyContext);
- // upgrade
- reconciler.reconcile(sessionJob, readyContext);
- });
-
Assertions.assertEquals(
- ReconciliationState.UPGRADING,
- sessionJob.getStatus().getReconciliationStatus().getState());
- // jobID not changed
- Assertions.assertEquals(jobID,
sessionJob.getStatus().getJobStatus().getJobId());
-
- // mock a job with different id of the target CR occurs
- var jobs = flinkService.listJobs();
- for (var job : jobs) {
- if (!job.f1.getJobState().isGloballyTerminalState()
- && !job.f1.getJobId().toHexString().equals(jobID)) {
- job.f1 =
- new JobStatusMessage(
- FlinkUtils.generateSessionJobFixedJobID(
- sessionJob.getMetadata().getUid(),
-1L),
- job.f1.getJobName(),
- job.f1.getJobState(),
- job.f1.getStartTime());
- }
- }
-
- var exception =
- Assertions.assertThrows(
- RuntimeException.class, () ->
observer.observe(sessionJob, readyContext));
- Assertions.assertTrue(
- exception.getMessage().contains("doesn't match upgrade target
generation"));
+ submitted ? JobStatus.RUNNING.name() :
JobStatus.RECONCILING.name(),
+ sessionJob.getStatus().getJobStatus().getState());
}
@Test
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
index ab518429..96a6e6cb 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
@@ -19,6 +19,7 @@ package
org.apache.flink.kubernetes.operator.reconciler.sessionjob;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.autoscaler.NoopJobAutoscaler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.OperatorTestBase;
import org.apache.flink.kubernetes.operator.TestUtils;
@@ -28,6 +29,7 @@ import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
+import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
@@ -92,7 +94,9 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
configManager = new FlinkConfigManager(configuration);
reconciler =
new TestReconcilerAdapter<>(
- this, new SessionJobReconciler(eventRecorder,
statusRecorder));
+ this,
+ new SessionJobReconciler(
+ eventRecorder, statusRecorder, new
NoopJobAutoscaler<>()));
}
@Test
@@ -790,4 +794,41 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
spSessionJob.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
assertEquals("FINISHED",
spSessionJob.getStatus().getJobStatus().getState());
}
+
+ @Test
+ public void testJobIdGeneration() throws Exception {
+ var sessionJob = TestUtils.buildSessionJob();
+ sessionJob.getMetadata().setGeneration(10L);
+ var readyContext =
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
+
+ reconciler.reconcile(sessionJob, readyContext);
+ Assertions.assertEquals(
+ ReconciliationState.DEPLOYED,
+ sessionJob.getStatus().getReconciliationStatus().getState());
+ var jobID = sessionJob.getStatus().getJobStatus().getJobId();
+ Assertions.assertEquals(
+ RECONCILING.name(),
sessionJob.getStatus().getJobStatus().getState());
+ Assertions.assertEquals(jobID,
flinkService.listJobs().get(0).f1.getJobId().toString());
+
+ flinkService.setSessionJobSubmittedCallback(
+ () -> {
+ throw new RuntimeException("Failed after submitted job");
+ });
+ sessionJob.getSpec().getJob().setParallelism(10);
+ // upgrade
+ Assertions.assertThrows(
+ RuntimeException.class,
+ () -> {
+ // suspend
+ reconciler.reconcile(sessionJob, readyContext);
+ // upgrade
+ reconciler.reconcile(sessionJob, readyContext);
+ });
+
+ Assertions.assertEquals(
+ ReconciliationState.UPGRADING,
+ sessionJob.getStatus().getReconciliationStatus().getState());
+ // New jobID recorded despite failure
+ Assertions.assertNotEquals(jobID,
sessionJob.getStatus().getJobStatus().getJobId());
+ }
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
index 81f7c67c..3739298f 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
@@ -198,7 +198,8 @@ public class AbstractFlinkServiceTest {
var job = TestUtils.buildSessionJob();
var deployConf = configManager.getSessionJobConfig(session,
job.getSpec());
- flinkService.submitJobToSessionCluster(job.getMetadata(),
job.getSpec(), deployConf, null);
+ flinkService.submitJobToSessionCluster(
+ job.getMetadata(), job.getSpec(), JobID.generate(),
deployConf, null);
// Make sure that deploy conf was passed to jar run
if (flinkVersion.isNewerVersionThan(FlinkVersion.v1_16)) {
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
index 912fef5f..23e2204e 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
@@ -355,13 +355,6 @@ public class FlinkUtilsTest {
assertEquals(List.of(v1merged, volume2, volume3),
mergedPod.getSpec().getVolumes());
}
- @Test
- public void testJobIDGeneration() {
- JobID jobID =
-
FlinkUtils.generateSessionJobFixedJobID("ffffffff-ffff-ffff-aaaa-aaaaaaaaaaaa",
2L);
- assertEquals("ffffffffffffffff0000000000000002", jobID.toString());
- }
-
private void createHAConfigMapWithData(
String configMapName, String namespace, String clusterId,
Map<String, String> data) {
final ConfigMap kubernetesConfigMap =