This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new a738c81 [FLINK-26507] Allow session and last-state jobs to reconcile
in any state
a738c81 is described below
commit a738c815a12fc647d24cbbd10a2f4f0a5b9f6900
Author: Gyula Fora <[email protected]>
AuthorDate: Tue Mar 8 15:17:51 2022 +0100
[FLINK-26507] Allow session and last-state jobs to reconcile in any state
---
.../controller/FlinkDeploymentController.java | 26 ++--
.../observer/JobManagerDeploymentStatus.java | 6 +-
.../kubernetes/operator/observer/Observer.java | 68 +++++------
.../operator/reconciler/JobReconciler.java | 77 ++++++++----
.../kubernetes/operator/reconciler/Reconciler.java | 5 +-
.../operator/reconciler/ReconciliationUtils.java | 3 +-
.../operator/reconciler/SessionReconciler.java | 12 +-
.../kubernetes/operator/utils/FlinkUtils.java | 2 +-
.../kubernetes/operator/utils/SavepointUtils.java | 7 +-
.../flink/kubernetes/operator/TestUtils.java | 16 ++-
.../kubernetes/operator/TestingFlinkService.java | 5 +
.../controller/FlinkDeploymentControllerTest.java | 134 +++++++++++++++++++--
.../kubernetes/operator/observer/ObserverTest.java | 47 ++++----
.../operator/reconciler/JobReconcilerTest.java | 2 +
.../validation/DeploymentValidatorTest.java | 11 +-
15 files changed, 288 insertions(+), 133 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 2a2a8d3..195defd 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -119,24 +119,17 @@ public class FlinkDeploymentController
FlinkUtils.getEffectiveConfig(flinkApp,
defaultConfig.getFlinkConfig());
try {
- boolean readyToReconcile = observer.observe(flinkApp, context,
effectiveConfig);
- if (!readyToReconcile) {
- return flinkApp.getStatus()
- .getJobManagerDeploymentStatus()
- .toUpdateControl(flinkApp, operatorConfiguration);
- }
-
- UpdateControl<FlinkDeployment> updateControl =
- reconcilerFactory
- .getOrCreate(flinkApp)
- .reconcile(operatorNamespace, flinkApp, context,
effectiveConfig);
- return updateControl;
+ observer.observe(flinkApp, context, effectiveConfig);
+ reconcilerFactory
+ .getOrCreate(flinkApp)
+ .reconcile(operatorNamespace, flinkApp, context,
effectiveConfig);
} catch (DeploymentFailedException dfe) {
handleDeploymentFailed(flinkApp, dfe);
- return UpdateControl.updateStatus(flinkApp);
} catch (Exception e) {
throw new ReconciliationException(e);
}
+
+ return getUpdateControl(flinkApp);
}
private void handleDeploymentFailed(FlinkDeployment flinkApp,
DeploymentFailedException dfe) {
@@ -157,6 +150,13 @@ public class FlinkDeploymentController
.create(event);
}
+ private UpdateControl<FlinkDeployment> getUpdateControl(FlinkDeployment
deployment) {
+ return deployment
+ .getStatus()
+ .getJobManagerDeploymentStatus()
+ .toUpdateControl(deployment, operatorConfiguration);
+ }
+
@Override
public List<EventSource>
prepareEventSources(EventSourceContext<FlinkDeployment> ctx) {
Preconditions.checkNotNull(controllerConfig, "Controller config cannot
be null");
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
index 35a395d..4a7defc 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
@@ -48,6 +48,8 @@ public enum JobManagerDeploymentStatus {
switch (this) {
case DEPLOYING:
case READY:
+ case MISSING:
+ case ERROR:
return UpdateControl.updateStatus(flinkDeployment)
.rescheduleAfter(
operatorConfiguration.getReconcileIntervalInSec(),
@@ -57,10 +59,8 @@ public enum JobManagerDeploymentStatus {
.rescheduleAfter(
operatorConfiguration.getPortCheckIntervalInSec(),
TimeUnit.SECONDS);
- case MISSING:
- case ERROR:
default:
- return UpdateControl.noUpdate();
+ throw new RuntimeException("Unknown status: " + this);
}
}
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
index 46fb983..fc0b894 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
@@ -48,6 +48,8 @@ public class Observer {
private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
+ public static final String JOB_STATE_UNKNOWN = "UNKNOWN";
+
private final FlinkService flinkService;
private final FlinkOperatorConfiguration operatorConfiguration;
@@ -56,10 +58,20 @@ public class Observer {
this.operatorConfiguration = operatorConfiguration;
}
- public boolean observe(
- FlinkDeployment flinkApp, Context context, Configuration
effectiveConfig) {
+ public void observe(FlinkDeployment flinkApp, Context context,
Configuration effectiveConfig) {
observeJmDeployment(flinkApp, context, effectiveConfig);
- return isReadyToReconcile(flinkApp, effectiveConfig);
+ if (isApplicationClusterReady(flinkApp)) {
+ boolean jobFound = observeFlinkJobStatus(flinkApp,
effectiveConfig);
+ if (jobFound) {
+ observeSavepointStatus(flinkApp, effectiveConfig);
+ }
+ }
+ }
+
+ private boolean isApplicationClusterReady(FlinkDeployment dep) {
+ return dep.getSpec().getJob() != null
+ && dep.getStatus().getJobManagerDeploymentStatus()
+ == JobManagerDeploymentStatus.READY;
}
private void observeJmDeployment(
@@ -123,12 +135,6 @@ public class Observer {
}
private boolean observeFlinkJobStatus(FlinkDeployment flinkApp,
Configuration effectiveConfig) {
-
- // No need to observe job status for session clusters
- if (flinkApp.getSpec().getJob() == null) {
- return true;
- }
-
LOG.info("Getting job statuses for {}",
flinkApp.getMetadata().getName());
FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus();
@@ -137,32 +143,32 @@ public class Observer {
clusterJobStatuses = flinkService.listJobs(effectiveConfig);
} catch (Exception e) {
LOG.error("Exception while listing jobs", e);
- flinkAppStatus.getJobStatus().setState("UNKNOWN");
+ flinkAppStatus.getJobStatus().setState(JOB_STATE_UNKNOWN);
return false;
}
if (clusterJobStatuses.isEmpty()) {
LOG.info("No jobs found on {} yet",
flinkApp.getMetadata().getName());
+ flinkAppStatus.getJobStatus().setState(JOB_STATE_UNKNOWN);
return false;
- } else {
- updateJobStatus(flinkAppStatus.getJobStatus(), new
ArrayList<>(clusterJobStatuses));
- LOG.info("Job statuses updated for {}",
flinkApp.getMetadata().getName());
- return true;
}
+
+ updateJobStatus(flinkAppStatus.getJobStatus(), new
ArrayList<>(clusterJobStatuses));
+ LOG.info("Job statuses updated for {}",
flinkApp.getMetadata().getName());
+ return true;
}
- private boolean observeSavepointStatus(
- FlinkDeployment flinkApp, Configuration effectiveConfig) {
+ private void observeSavepointStatus(FlinkDeployment flinkApp,
Configuration effectiveConfig) {
SavepointInfo savepointInfo =
flinkApp.getStatus().getJobStatus().getSavepointInfo();
- if (savepointInfo.getTriggerId() == null) {
+ if (!SavepointUtils.savepointInProgress(flinkApp)) {
LOG.debug("Checkpointing not in progress");
- return true;
+ return;
}
SavepointFetchResult savepointFetchResult;
try {
savepointFetchResult = flinkService.fetchSavepointInfo(flinkApp,
effectiveConfig);
} catch (Exception e) {
LOG.error("Exception while fetching savepoint info", e);
- return false;
+ return;
}
if (!savepointFetchResult.isTriggered()) {
@@ -173,37 +179,17 @@ public class Observer {
LOG.error(errorMsg);
savepointInfo.setTriggerId(null);
ReconciliationUtils.updateForReconciliationError(flinkApp,
errorMsg);
- return false;
+ return;
}
LOG.info("Savepoint operation not running, waiting within grace
period");
}
if (savepointFetchResult.getSavepoint() == null) {
LOG.info("Savepoint not completed yet");
- return false;
+ return;
}
savepointInfo.setLastSavepoint(savepointFetchResult.getSavepoint());
savepointInfo.setTriggerId(null);
- return true;
- }
-
- private boolean isReadyToReconcile(FlinkDeployment flinkApp, Configuration
effectiveConfig) {
- JobManagerDeploymentStatus jmDeploymentStatus =
- flinkApp.getStatus().getJobManagerDeploymentStatus();
-
- switch (jmDeploymentStatus) {
- case READY:
- return observeFlinkJobStatus(flinkApp, effectiveConfig)
- && observeSavepointStatus(flinkApp, effectiveConfig);
- case MISSING:
- case ERROR:
- return true;
- case DEPLOYING:
- case DEPLOYED_NOT_READY:
- return false;
- default:
- throw new RuntimeException("Unknown status: " +
jmDeploymentStatus);
- }
}
/** Update previous job status based on the job list from the cluster. */
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
index 6f6b597..df9a8d8 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
@@ -25,6 +25,7 @@ import
org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
import
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
@@ -36,12 +37,12 @@ import
org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
-import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Optional;
-import java.util.concurrent.TimeUnit;
+
+import static
org.apache.flink.kubernetes.operator.observer.Observer.JOB_STATE_UNKNOWN;
/**
* Reconciler responsible for handling the job lifecycle according to the
desired and current
@@ -59,12 +60,13 @@ public class JobReconciler extends BaseReconciler {
}
@Override
- public UpdateControl<FlinkDeployment> reconcile(
+ public void reconcile(
String operatorNamespace,
FlinkDeployment flinkApp,
Context context,
Configuration effectiveConfig)
throws Exception {
+
FlinkDeploymentSpec lastReconciledSpec =
flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
JobSpec jobSpec = flinkApp.getSpec().getJob();
@@ -76,14 +78,18 @@ public class JobReconciler extends BaseReconciler {
IngressUtils.updateIngressRules(
flinkApp, effectiveConfig, operatorNamespace,
kubernetesClient, false);
ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp);
- return JobManagerDeploymentStatus.DEPLOYING.toUpdateControl(
- flinkApp, operatorConfiguration);
+ return;
+ }
+
+ if (SavepointUtils.savepointInProgress(flinkApp)) {
+ LOG.info(
+ "Savepoint currently in progress for {}, delaying
reconcile...",
+ flinkApp.getMetadata().getName());
+ return;
}
- // TODO: following assumes that current job is running
- // What if it never enters running state due to bad deployment?
boolean specChanged = !flinkApp.getSpec().equals(lastReconciledSpec);
- if (specChanged) {
+ if (specChanged && readyForSpecChanges(flinkApp)) {
JobState currentJobState = lastReconciledSpec.getJob().getState();
JobState desiredJobState = jobSpec.getState();
@@ -94,7 +100,7 @@ public class JobReconciler extends BaseReconciler {
}
if (desiredJobState.equals(JobState.SUSPENDED)) {
printCancelLogs(upgradeMode,
flinkApp.getMetadata().getName());
- cancelJob(flinkApp, upgradeMode, effectiveConfig);
+ suspendJob(flinkApp, upgradeMode, effectiveConfig);
}
}
if (currentJobState == JobState.SUSPENDED && desiredJobState ==
JobState.RUNNING) {
@@ -115,14 +121,29 @@ public class JobReconciler extends BaseReconciler {
}
}
ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp);
- } else if (SavepointUtils.shouldTriggerSavepoint(flinkApp)) {
+ } else if (SavepointUtils.shouldTriggerSavepoint(flinkApp) &&
isJobRunning(flinkApp)) {
triggerSavepoint(flinkApp, effectiveConfig);
ReconciliationUtils.updateSavepointReconciliationSuccess(flinkApp);
}
+ }
+
+ private boolean readyForSpecChanges(FlinkDeployment deployment) {
+ if (deployment.getSpec().getJob().getUpgradeMode() !=
UpgradeMode.SAVEPOINT) {
+ // Only savepoint upgrade mode needs a running job
+ return true;
+ }
+ return deployment.getStatus().getJobManagerDeploymentStatus()
+ == JobManagerDeploymentStatus.MISSING
+ || isJobRunning(deployment);
+ }
- return UpdateControl.updateStatus(flinkApp)
- .rescheduleAfter(
- operatorConfiguration.getReconcileIntervalInSec(),
TimeUnit.SECONDS);
+ private boolean isJobRunning(FlinkDeployment deployment) {
+ FlinkDeploymentStatus status = deployment.getStatus();
+ JobManagerDeploymentStatus deploymentStatus =
status.getJobManagerDeploymentStatus();
+ return deploymentStatus == JobManagerDeploymentStatus.READY
+ && org.apache.flink.api.common.JobStatus.RUNNING
+ .name()
+ .equals(status.getJobStatus().getState());
}
private void deployFlinkJob(
@@ -134,13 +155,15 @@ public class JobReconciler extends BaseReconciler {
effectiveConfig.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH);
}
flinkService.submitApplicationCluster(flinkApp, effectiveConfig);
+ flinkApp.getStatus().getJobStatus().setState(JOB_STATE_UNKNOWN);
+
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
}
private void upgradeFlinkJob(FlinkDeployment flinkApp, Configuration
effectiveConfig)
throws Exception {
LOG.info("Upgrading running job");
final Optional<String> savepoint =
- cancelJob(flinkApp,
flinkApp.getSpec().getJob().getUpgradeMode(), effectiveConfig);
+ suspendJob(flinkApp,
flinkApp.getSpec().getJob().getUpgradeMode(), effectiveConfig);
deployFlinkJob(flinkApp, effectiveConfig, savepoint);
}
@@ -169,16 +192,24 @@ public class JobReconciler extends BaseReconciler {
}
}
- private Optional<String> cancelJob(
+ private Optional<String> suspendJob(
FlinkDeployment flinkApp, UpgradeMode upgradeMode, Configuration
effectiveConfig)
throws Exception {
- Optional<String> savepointOpt =
- flinkService.cancelJob(
-
JobID.fromHexString(flinkApp.getStatus().getJobStatus().getJobId()),
- upgradeMode,
- effectiveConfig);
+
+ Optional<String> savepointOpt = Optional.empty();
+ if (upgradeMode == UpgradeMode.STATELESS) {
+ shutdown(flinkApp, effectiveConfig);
+ } else {
+ String jobIdString =
flinkApp.getStatus().getJobStatus().getJobId();
+ savepointOpt =
+ flinkService.cancelJob(
+ jobIdString != null ?
JobID.fromHexString(jobIdString) : null,
+ upgradeMode,
+ effectiveConfig);
+ }
+
JobStatus jobStatus = flinkApp.getStatus().getJobStatus();
- jobStatus.setState("suspended");
+ jobStatus.setState(JobState.SUSPENDED.name());
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
savepointOpt.ifPresent(
location -> {
@@ -189,9 +220,7 @@ public class JobReconciler extends BaseReconciler {
@Override
protected void shutdown(FlinkDeployment flinkApp, Configuration
effectiveConfig) {
- if (org.apache.flink.api.common.JobStatus.RUNNING
- .name()
-
.equalsIgnoreCase(flinkApp.getStatus().getJobStatus().getState())) {
+ if (isJobRunning(flinkApp)) {
LOG.info("Job is running, attempting graceful shutdown.");
try {
flinkService.cancelJob(
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/Reconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/Reconciler.java
index f07a9d8..f42405a 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/Reconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/Reconciler.java
@@ -23,7 +23,6 @@ import
org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
-import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
/** The interface of reconciler. */
public interface Reconciler {
@@ -35,10 +34,8 @@ public interface Reconciler {
* @param flinkApp the FlinkDeployment resource that has been created or
updated
* @param context the context with which the operation is executed
* @param effectiveConfig the effective config of the flinkApp
- * @return UpdateControl to manage updates on the custom resource (usually
the status) after
- * reconciliation.
*/
- UpdateControl<FlinkDeployment> reconcile(
+ void reconcile(
String operatorNamespace,
FlinkDeployment flinkApp,
Context context,
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index d429f66..aa58336 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -34,7 +34,8 @@ public class ReconciliationUtils {
reconciliationStatus.setSuccess(true);
reconciliationStatus.setError(null);
FlinkDeploymentSpec clonedSpec = clone(flinkApp.getSpec());
- if (reconciliationStatus.getLastReconciledSpec() != null) {
+ if (reconciliationStatus.getLastReconciledSpec() != null
+ && reconciliationStatus.getLastReconciledSpec().getJob() !=
null) {
long oldSavepointTriggerNonce =
reconciliationStatus
.getLastReconciledSpec()
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
index a4b64ce..58bc29f 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
@@ -21,14 +21,12 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
-import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
-
-import java.util.concurrent.TimeUnit;
/**
* Reconciler responsible for handling the session cluster lifecycle according
to the desired and
@@ -44,7 +42,7 @@ public class SessionReconciler extends BaseReconciler {
}
@Override
- public UpdateControl<FlinkDeployment> reconcile(
+ public void reconcile(
String operatorNamespace,
FlinkDeployment flinkApp,
Context context,
@@ -56,6 +54,8 @@ public class SessionReconciler extends BaseReconciler {
if (lastReconciledSpec == null) {
flinkService.submitSessionCluster(flinkApp, effectiveConfig);
+ flinkApp.getStatus()
+
.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
IngressUtils.updateIngressRules(
flinkApp, effectiveConfig, operatorNamespace,
kubernetesClient, false);
}
@@ -65,15 +65,13 @@ public class SessionReconciler extends BaseReconciler {
upgradeSessionCluster(flinkApp, effectiveConfig);
}
ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp);
- return UpdateControl.updateStatus(flinkApp)
- .rescheduleAfter(
- operatorConfiguration.getReconcileIntervalInSec(),
TimeUnit.SECONDS);
}
private void upgradeSessionCluster(FlinkDeployment flinkApp, Configuration
effectiveConfig)
throws Exception {
flinkService.stopSessionCluster(flinkApp, effectiveConfig, false);
flinkService.submitSessionCluster(flinkApp, effectiveConfig);
+
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
}
@Override
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index 868b946..38da91f 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -177,7 +177,7 @@ public class FlinkUtils {
.withLabels(KubernetesUtils.getJobManagerSelectors(clusterId))
.list();
- if (jmPodList.getItems().isEmpty()) {
+ if (jmPodList == null || jmPodList.getItems().isEmpty()) {
jobManagerRunning = false;
}
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
index 9366472..8fe80e3 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
@@ -25,8 +25,13 @@ import java.util.concurrent.TimeUnit;
/** Savepoint utilities. */
public class SavepointUtils {
+
+ public static boolean savepointInProgress(FlinkDeployment flinkDeployment)
{
+ return
flinkDeployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId() !=
null;
+ }
+
public static boolean shouldTriggerSavepoint(FlinkDeployment
flinkDeployment) {
- if
(flinkDeployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId()
!= null) {
+ if (savepointInProgress(flinkDeployment)) {
return false;
}
return flinkDeployment.getSpec().getJob().getSavepointTriggerNonce()
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index d76b98a..3bb1251 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -17,7 +17,9 @@
package org.apache.flink.kubernetes.operator;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
+import
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec;
@@ -25,6 +27,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.Resource;
import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import io.fabric8.kubernetes.api.model.Container;
@@ -39,7 +42,9 @@ import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
/** Testing utilities. */
@@ -72,19 +77,26 @@ public class TestUtils {
JobSpec.builder()
.jarURI(SAMPLE_JAR)
.parallelism(1)
+ .upgradeMode(UpgradeMode.STATELESS)
.state(JobState.RUNNING)
.build());
return deployment;
}
public static FlinkDeploymentSpec getTestFlinkDeploymentSpec() {
+ Map<String, String> conf = new HashMap<>();
+ conf.put(TaskManagerOptions.NUM_TASK_SLOTS.key(), "2");
+ conf.put(
+ HighAvailabilityOptions.HA_MODE.key(),
+ KubernetesHaServicesFactory.class.getCanonicalName());
+ conf.put(HighAvailabilityOptions.HA_STORAGE_PATH.key(), "test");
+
return FlinkDeploymentSpec.builder()
.image(IMAGE)
.imagePullPolicy(IMAGE_POLICY)
.serviceAccount(SERVICE_ACCOUNT)
.flinkVersion(FLINK_VERSION)
- .flinkConfiguration(
-
Collections.singletonMap(TaskManagerOptions.NUM_TASK_SLOTS.key(), "2"))
+ .flinkConfiguration(conf)
.jobManager(new JobManagerSpec(new Resource(1, "2048m"), 1,
null))
.taskManager(new TaskManagerSpec(new Resource(1, "2048m"),
null))
.build();
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 5da2ec5..69a7f4c 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -89,6 +89,11 @@ public class TestingFlinkService extends FlinkService {
public Optional<String> cancelJob(JobID jobID, UpgradeMode upgradeMode,
Configuration conf)
throws Exception {
+ if (upgradeMode == UpgradeMode.LAST_STATE) {
+ jobs.clear();
+ return Optional.empty();
+ }
+
if (!jobs.removeIf(js -> js.f1.getJobId().equals(jobID))) {
throw new Exception("Job not found");
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 04e9925..a8ab582 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -42,7 +42,6 @@ import
io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
-import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -50,14 +49,13 @@ import org.junit.jupiter.api.Test;
import java.net.HttpURLConnection;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -78,7 +76,7 @@ public class FlinkDeploymentControllerTest {
@BeforeEach
public void setup() {
flinkService = new TestingFlinkService();
- mockServer = new KubernetesMockServer(new MockWebServer(), new
HashMap<>(), true);
+ mockServer = new KubernetesMockServer();
mockServer.init();
kubernetesClient = mockServer.createClient();
testController = createTestController(kubernetesClient, flinkService);
@@ -156,7 +154,6 @@ public class FlinkDeploymentControllerTest {
@Test
public void verifyFailedDeployment() throws Exception {
-
mockServer
.expect()
.post()
@@ -169,11 +166,15 @@ public class FlinkDeploymentControllerTest {
FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
UpdateControl<FlinkDeployment> updateControl;
- updateControl =
- testController.reconcile(
- appCluster,
TestUtils.createContextWithFailedJobManagerDeployment());
+ updateControl = testController.reconcile(appCluster,
TestUtils.createEmptyContext());
+ testController.reconcile(
+ appCluster,
TestUtils.createContextWithFailedJobManagerDeployment());
assertTrue(updateControl.isUpdateStatus());
- assertEquals(Optional.empty(), updateControl.getScheduleDelay());
+ assertEquals(
+ JobManagerDeploymentStatus.ERROR
+ .toUpdateControl(appCluster, operatorConfiguration)
+ .getScheduleDelay(),
+ updateControl.getScheduleDelay());
RecordedRequest recordedRequest = mockServer.getLastRequest();
assertEquals("POST", recordedRequest.getMethod());
@@ -265,6 +266,9 @@ public class FlinkDeploymentControllerTest {
assertEquals(1, jobs.size());
assertEquals("s0", jobs.get(0).f0);
+ testController.reconcile(appCluster, context);
+ testController.reconcile(appCluster, context);
+
// Upgrade job
appCluster = ReconciliationUtils.clone(appCluster);
appCluster.getSpec().getJob().setParallelism(100);
@@ -272,7 +276,7 @@ public class FlinkDeploymentControllerTest {
UpdateControl<FlinkDeployment> updateControl =
testController.reconcile(appCluster, context);
assertEquals(
- JobManagerDeploymentStatus.DEPLOYED_NOT_READY
+ JobManagerDeploymentStatus.DEPLOYING
.toUpdateControl(appCluster, operatorConfiguration)
.getScheduleDelay(),
updateControl.getScheduleDelay());
@@ -281,6 +285,9 @@ public class FlinkDeploymentControllerTest {
assertEquals(1, jobs.size());
assertEquals(null, jobs.get(0).f0);
+ testController.reconcile(appCluster, context);
+ testController.reconcile(appCluster, context);
+
// Suspend job
appCluster = ReconciliationUtils.clone(appCluster);
appCluster.getSpec().getJob().setState(JobState.SUSPENDED);
@@ -296,6 +303,113 @@ public class FlinkDeploymentControllerTest {
}
@Test
+ public void testUpgradeNotReadyCluster() {
+ testUpgradeNotReadyCluster(TestUtils.buildSessionCluster(), true);
+
+ FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
+ appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
+ testUpgradeNotReadyCluster(appCluster, true);
+
+ appCluster = TestUtils.buildApplicationCluster();
+ appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+ testUpgradeNotReadyCluster(appCluster, true);
+
+ appCluster = TestUtils.buildApplicationCluster();
+ appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
+ testUpgradeNotReadyCluster(appCluster, false);
+ }
+
+ public void testUpgradeNotReadyCluster(FlinkDeployment appCluster, boolean
allowUpgrade) {
+ mockServer
+ .expect()
+ .delete()
+
.withPath("/apis/apps/v1/namespaces/flink-operator-test/deployments/test-cluster")
+ .andReturn(
+ HttpURLConnection.HTTP_CREATED,
+ new
EventBuilder().withNewMetadata().endMetadata().build())
+ .always();
+
+ mockServer
+ .expect()
+ .get()
+ .withPath(
+
"/api/v1/namespaces/flink-operator-test/pods?labelSelector=app%3Dtest-cluster%2Ccomponent%3Djobmanager%2Ctype%3Dflink-native-kubernetes")
+ .andReturn(
+ HttpURLConnection.HTTP_CREATED,
+ new
EventBuilder().withNewMetadata().endMetadata().build())
+ .always();
+
+ testController.reconcile(appCluster, TestUtils.createEmptyContext());
+ assertEquals(
+ appCluster.getSpec(),
+
appCluster.getStatus().getReconciliationStatus().getLastReconciledSpec());
+
+ flinkService.setPortReady(false);
+ testController.reconcile(appCluster, context);
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYING,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+
+ // trigger change
+
appCluster.getSpec().setServiceAccount(appCluster.getSpec().getServiceAccount()
+ "-2");
+
+ // Verify that even in DEPLOYING state we still redeploy
+ testController.reconcile(appCluster, context);
+ testController.reconcile(appCluster, context);
+ if (allowUpgrade) {
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYING,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+ assertEquals(
+ appCluster.getSpec(),
+
appCluster.getStatus().getReconciliationStatus().getLastReconciledSpec());
+
+ flinkService.setPortReady(true);
+ testController.reconcile(appCluster, context);
+ testController.reconcile(appCluster, context);
+
+ if (appCluster.getSpec().getJob() != null) {
+ assertEquals("RUNNING",
appCluster.getStatus().getJobStatus().getState());
+ } else {
+ assertNull(appCluster.getStatus().getJobStatus().getState());
+ }
+ assertEquals(
+ JobManagerDeploymentStatus.READY,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+ } else {
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYING,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+ assertNotEquals(
+ appCluster.getSpec(),
+
appCluster.getStatus().getReconciliationStatus().getLastReconciledSpec());
+
+ flinkService.setPortReady(true);
+ testController.reconcile(appCluster, context);
+ testController.reconcile(appCluster, context);
+
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYING,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+ assertEquals(
+ appCluster.getSpec(),
+
appCluster.getStatus().getReconciliationStatus().getLastReconciledSpec());
+
+ testController.reconcile(appCluster, context);
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+
+ testController.reconcile(appCluster, context);
+
+ assertEquals("RUNNING",
appCluster.getStatus().getJobStatus().getState());
+ assertEquals(
+ JobManagerDeploymentStatus.READY,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+ }
+ }
+
+ @Test
public void testPrepareEventSource() {
// Test watch all
testController.setControllerConfig(
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
index 0b4289f..4a3a425 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
@@ -32,9 +33,7 @@ import io.javaoperatorsdk.operator.api.reconciler.Context;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
/** @link Observer unit tests */
public class ObserverTest {
@@ -54,21 +53,19 @@ public class ObserverTest {
.getReconciliationStatus()
.setLastReconciledSpec(deployment.getSpec());
- assertFalse(
- observer.observe(
- deployment,
- readyContext,
- FlinkUtils.getEffectiveConfig(deployment, new
Configuration())));
+ observer.observe(
+ deployment,
+ readyContext,
+ FlinkUtils.getEffectiveConfig(deployment, new
Configuration()));
assertEquals(
JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
deployment.getStatus().getJobManagerDeploymentStatus());
- assertTrue(
- observer.observe(
- deployment,
- readyContext,
- FlinkUtils.getEffectiveConfig(deployment, new
Configuration())));
+ observer.observe(
+ deployment,
+ readyContext,
+ FlinkUtils.getEffectiveConfig(deployment, new
Configuration()));
assertEquals(
JobManagerDeploymentStatus.READY,
@@ -85,7 +82,7 @@ public class ObserverTest {
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
Configuration conf = FlinkUtils.getEffectiveConfig(deployment, new
Configuration());
- assertTrue(observer.observe(deployment,
TestUtils.createEmptyContext(), conf));
+ observer.observe(deployment, TestUtils.createEmptyContext(), conf);
deployment.setStatus(new FlinkDeploymentStatus());
deployment
@@ -99,33 +96,35 @@ public class ObserverTest {
flinkService.setPortReady(false);
// Port not ready
- assertFalse(observer.observe(deployment, readyContext, conf));
+ observer.observe(deployment, readyContext, conf);
assertEquals(
JobManagerDeploymentStatus.DEPLOYING,
deployment.getStatus().getJobManagerDeploymentStatus());
- assertFalse(observer.observe(deployment, readyContext, conf));
+ observer.observe(deployment, readyContext, conf);
assertEquals(
JobManagerDeploymentStatus.DEPLOYING,
deployment.getStatus().getJobManagerDeploymentStatus());
flinkService.setPortReady(true);
// Port ready but we have to recheck once again
- assertFalse(observer.observe(deployment, readyContext, conf));
+ observer.observe(deployment, readyContext, conf);
assertEquals(
JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
deployment.getStatus().getJobManagerDeploymentStatus());
// Stable ready
- assertTrue(observer.observe(deployment, readyContext, conf));
+ observer.observe(deployment, readyContext, conf);
assertEquals(
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
+ assertEquals(JobState.RUNNING.name(),
deployment.getStatus().getJobStatus().getState());
- assertTrue(observer.observe(deployment, readyContext, conf));
+ observer.observe(deployment, readyContext, conf);
assertEquals(
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
+ assertEquals(JobState.RUNNING.name(),
deployment.getStatus().getJobStatus().getState());
assertEquals(
deployment.getMetadata().getName(),
@@ -133,11 +132,11 @@ public class ObserverTest {
// Test listing failure
flinkService.clear();
- assertFalse(observer.observe(deployment, readyContext, conf));
+ observer.observe(deployment, readyContext, conf);
assertEquals(
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
- assertEquals("UNKNOWN",
deployment.getStatus().getJobStatus().getState());
+ assertEquals(Observer.JOB_STATE_UNKNOWN,
deployment.getStatus().getJobStatus().getState());
}
@Test
@@ -151,7 +150,7 @@ public class ObserverTest {
Configuration conf = FlinkUtils.getEffectiveConfig(deployment, new
Configuration());
flinkService.submitApplicationCluster(deployment, conf);
bringToReadyStatus(deployment);
- assertTrue(observer.observe(deployment, readyContext, conf));
+ observer.observe(deployment, readyContext, conf);
assertEquals(
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
@@ -160,7 +159,7 @@ public class ObserverTest {
assertEquals(
"trigger_0",
deployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
- assertTrue(observer.observe(deployment, readyContext, conf));
+ observer.observe(deployment, readyContext, conf);
assertEquals(
"savepoint_0",
deployment
@@ -175,7 +174,7 @@ public class ObserverTest {
assertEquals(
"trigger_1",
deployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
- assertTrue(observer.observe(deployment, readyContext, conf));
+ observer.observe(deployment, readyContext, conf);
assertEquals(
"savepoint_1",
deployment
@@ -195,7 +194,7 @@ public class ObserverTest {
JobStatus jobStatus = new JobStatus();
jobStatus.setJobName("jobname");
jobStatus.setJobId("0000000000");
- jobStatus.setState("RUNNING");
+ jobStatus.setState(JobState.RUNNING.name());
deployment.getStatus().setJobStatus(jobStatus);
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
index c712501..40aaf3b 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
@@ -26,6 +26,7 @@ import
org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
@@ -176,5 +177,6 @@ public class JobReconcilerTest {
jobStatus.setState("RUNNING");
deployment.getStatus().setJobStatus(jobStatus);
+
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
}
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
index bfeefb6..31d7d25 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
@@ -33,6 +33,7 @@ import org.junit.Assert;
import org.junit.jupiter.api.Test;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
@@ -62,7 +63,10 @@ public class DeploymentValidatorTest {
dep -> dep.getSpec().getJob().setParallelism(-1),
"Job parallelism must be larger than 0");
testError(
- dep ->
dep.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE),
+ dep -> {
+ dep.getSpec().setFlinkConfiguration(new HashMap<>());
+
dep.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+ },
"Job could not be upgraded with last-state while HA disabled");
// Test conf validation
@@ -93,7 +97,10 @@ public class DeploymentValidatorTest {
"Invalid log config key");
testError(
- dep -> dep.getSpec().getJobManager().setReplicas(2),
+ dep -> {
+ dep.getSpec().setFlinkConfiguration(new HashMap<>());
+ dep.getSpec().getJobManager().setReplicas(2);
+ },
"High availability should be enabled when starting standby
JobManagers.");
testError(
dep -> dep.getSpec().getJobManager().setReplicas(0),