This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 49debb1 [FLINK-26830] Transition to SUSPEND before redeploying on job
upgrades
49debb1 is described below
commit 49debb14ef530cae79d5d75f81029629a6afed4c
Author: Gyula Fora <[email protected]>
AuthorDate: Wed Mar 23 15:31:06 2022 +0100
[FLINK-26830] Transition to SUSPEND before redeploying on job upgrades
---
.../controller/FlinkDeploymentController.java | 12 ++----
.../operator/reconciler/JobReconciler.java | 32 ++++++---------
.../operator/reconciler/ReconciliationUtils.java | 41 +++++++++++++++++--
.../operator/reconciler/SessionReconciler.java | 2 +-
.../controller/FlinkDeploymentControllerTest.java | 26 +++++++++++-
.../operator/reconciler/JobReconcilerTest.java | 10 +++++
.../operator/utils/ReconciliationUtilsTest.java | 46 ++++++++++++++++------
7 files changed, 125 insertions(+), 44 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index aad86a1..6e4e587 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -47,7 +47,6 @@ import
io.javaoperatorsdk.operator.processing.event.source.EventSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.Set;
@@ -112,7 +111,8 @@ public class FlinkDeploymentController
if (validationError.isPresent()) {
LOG.error("Validation failed: " + validationError.get());
ReconciliationUtils.updateForReconciliationError(flinkApp,
validationError.get());
- return ReconciliationUtils.toUpdateControl(originalCopy, flinkApp);
+ return ReconciliationUtils.toUpdateControl(
+ operatorConfiguration, originalCopy, flinkApp, false);
}
Configuration effectiveConfig =
@@ -127,12 +127,8 @@ public class FlinkDeploymentController
throw new ReconciliationException(e);
}
- Duration rescheduleAfter =
- flinkApp.getStatus()
- .getJobManagerDeploymentStatus()
- .rescheduleAfter(flinkApp, operatorConfiguration);
- return ReconciliationUtils.toUpdateControl(originalCopy, flinkApp)
- .rescheduleAfter(rescheduleAfter.toMillis());
+ return ReconciliationUtils.toUpdateControl(
+ operatorConfiguration, originalCopy, flinkApp, true);
}
private void handleDeploymentFailed(FlinkDeployment flinkApp,
DeploymentFailedException dfe) {
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
index 0eb7608..608ebeb 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
@@ -72,7 +72,7 @@ public class JobReconciler extends BaseReconciler {
effectiveConfig,
Optional.ofNullable(jobSpec.getInitialSavepointPath()));
IngressUtils.updateIngressRules(flinkApp, effectiveConfig,
kubernetesClient);
- ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp);
+ ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp,
JobState.RUNNING);
return;
}
@@ -87,14 +87,14 @@ public class JobReconciler extends BaseReconciler {
JobState desiredJobState = jobSpec.getState();
UpgradeMode upgradeMode = jobSpec.getUpgradeMode();
+ JobState stateAfterReconcile = currentJobState;
if (currentJobState == JobState.RUNNING) {
if (desiredJobState == JobState.RUNNING) {
- upgradeFlinkJob(flinkApp, effectiveConfig);
- }
- if (desiredJobState.equals(JobState.SUSPENDED)) {
- printCancelLogs(upgradeMode,
flinkApp.getMetadata().getName());
- suspendJob(flinkApp, upgradeMode, effectiveConfig);
+ LOG.info("Upgrading running job, suspending first...");
}
+ printCancelLogs(upgradeMode, flinkApp.getMetadata().getName());
+ suspendJob(flinkApp, upgradeMode, effectiveConfig);
+ stateAfterReconcile = JobState.SUSPENDED;
}
if (currentJobState == JobState.SUSPENDED && desiredJobState ==
JobState.RUNNING) {
if (upgradeMode == UpgradeMode.STATELESS) {
@@ -103,9 +103,10 @@ public class JobReconciler extends BaseReconciler {
|| upgradeMode == UpgradeMode.SAVEPOINT) {
restoreFromLastSavepoint(flinkApp, effectiveConfig);
}
+ stateAfterReconcile = JobState.RUNNING;
}
IngressUtils.updateIngressRules(flinkApp, effectiveConfig,
kubernetesClient);
- ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp);
+ ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp,
stateAfterReconcile);
} else if (SavepointUtils.shouldTriggerSavepoint(flinkApp) &&
isJobRunning(flinkApp)) {
triggerSavepoint(flinkApp, effectiveConfig);
ReconciliationUtils.updateSavepointReconciliationSuccess(flinkApp);
@@ -144,21 +145,14 @@ public class JobReconciler extends BaseReconciler {
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
}
- private void upgradeFlinkJob(FlinkDeployment flinkApp, Configuration
effectiveConfig)
- throws Exception {
- LOG.info("Upgrading running job");
- final Optional<String> savepoint =
- suspendJob(flinkApp,
flinkApp.getSpec().getJob().getUpgradeMode(), effectiveConfig);
- deployFlinkJob(flinkApp, effectiveConfig, savepoint);
- }
-
private void restoreFromLastSavepoint(FlinkDeployment flinkApp,
Configuration effectiveConfig)
throws Exception {
JobStatus jobStatus = flinkApp.getStatus().getJobStatus();
- deployFlinkJob(
- flinkApp,
- effectiveConfig,
-
Optional.of(jobStatus.getSavepointInfo().getLastSavepoint().getLocation()));
+ Optional<String> savepointOpt =
+
Optional.ofNullable(jobStatus.getSavepointInfo().getLastSavepoint())
+ .flatMap(s -> Optional.ofNullable(s.getLocation()));
+
+ deployFlinkJob(flinkApp, effectiveConfig, savepointOpt);
}
private void printCancelLogs(UpgradeMode upgradeMode, String name) {
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index f7a19c5..219ebbd 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -17,14 +17,17 @@
package org.apache.flink.kubernetes.operator.reconciler;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import java.time.Duration;
import java.util.Objects;
/** Reconciliation utilities. */
@@ -32,7 +35,8 @@ public class ReconciliationUtils {
private static final ObjectMapper objectMapper = new ObjectMapper();
- public static void updateForSpecReconciliationSuccess(FlinkDeployment
flinkApp) {
+ public static void updateForSpecReconciliationSuccess(
+ FlinkDeployment flinkApp, JobState stateAfterReconcile) {
ReconciliationStatus reconciliationStatus =
flinkApp.getStatus().getReconciliationStatus();
reconciliationStatus.setSuccess(true);
reconciliationStatus.setError(null);
@@ -45,6 +49,7 @@ public class ReconciliationUtils {
.getJob()
.getSavepointTriggerNonce();
clonedSpec.getJob().setSavepointTriggerNonce(oldSavepointTriggerNonce);
+ clonedSpec.getJob().setState(stateAfterReconcile);
}
reconciliationStatus.setLastReconciledSpec(clonedSpec);
}
@@ -79,7 +84,10 @@ public class ReconciliationUtils {
}
public static UpdateControl<FlinkDeployment> toUpdateControl(
- FlinkDeployment originalCopy, FlinkDeployment current) {
+ FlinkOperatorConfiguration operatorConfiguration,
+ FlinkDeployment originalCopy,
+ FlinkDeployment current,
+ boolean reschedule) {
UpdateControl<FlinkDeployment> updateControl;
if (!Objects.equals(originalCopy.getSpec(), current.getSpec())) {
throw new UnsupportedOperationException(
@@ -93,6 +101,33 @@ public class ReconciliationUtils {
} else {
updateControl = UpdateControl.noUpdate();
}
- return updateControl;
+
+ if (!reschedule) {
+ return updateControl;
+ }
+
+ if (isJobUpgradeInProgress(current)) {
+ return updateControl.rescheduleAfter(0);
+ }
+
+ Duration rescheduleAfter =
+ current.getStatus()
+ .getJobManagerDeploymentStatus()
+ .rescheduleAfter(current, operatorConfiguration);
+
+ return updateControl.rescheduleAfter(rescheduleAfter.toMillis());
+ }
+
+ private static boolean isJobUpgradeInProgress(FlinkDeployment current) {
+ ReconciliationStatus reconciliationStatus =
current.getStatus().getReconciliationStatus();
+
+ if (reconciliationStatus == null || current.getSpec().getJob() ==
null) {
+ return false;
+ }
+
+ return current.getSpec().getJob().getState() == JobState.RUNNING
+ && reconciliationStatus.isSuccess()
+ &&
reconciliationStatus.getLastReconciledSpec().getJob().getState()
+ == JobState.SUSPENDED;
}
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
index 75ce176..7192b12 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
@@ -64,7 +64,7 @@ public class SessionReconciler extends BaseReconciler {
upgradeSessionCluster(flinkApp, effectiveConfig);
IngressUtils.updateIngressRules(flinkApp, effectiveConfig,
kubernetesClient);
}
- ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp);
+ ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp, null);
}
private void upgradeSessionCluster(FlinkDeployment flinkApp, Configuration
effectiveConfig)
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 96eedd0..e984bf6 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -283,7 +283,17 @@ public class FlinkDeploymentControllerTest {
appCluster = ReconciliationUtils.clone(appCluster);
appCluster.getSpec().getJob().setParallelism(100);
- testController.reconcile(appCluster, context);
+ assertEquals(0, testController.reconcile(appCluster,
context).getScheduleDelay().get());
+ assertEquals(
+ JobState.SUSPENDED,
+ appCluster
+ .getStatus()
+ .getReconciliationStatus()
+ .getLastReconciledSpec()
+ .getJob()
+ .getState());
+
+ testController.reconcile(appCluster, TestUtils.createEmptyContext());
jobs = flinkService.listJobs();
assertEquals(1, jobs.size());
assertEquals("savepoint_0", jobs.get(0).f0);
@@ -331,11 +341,23 @@ public class FlinkDeploymentControllerTest {
UpdateControl<FlinkDeployment> updateControl =
testController.reconcile(appCluster, context);
+ assertEquals(0, updateControl.getScheduleDelay().get());
+ assertEquals(
+ JobState.SUSPENDED,
+ appCluster
+ .getStatus()
+ .getReconciliationStatus()
+ .getLastReconciledSpec()
+ .getJob()
+ .getState());
+
+ updateControl = testController.reconcile(appCluster, context);
assertEquals(
JobManagerDeploymentStatus.DEPLOYING
.rescheduleAfter(appCluster, operatorConfiguration)
.toMillis(),
updateControl.getScheduleDelay().get());
+
testController.reconcile(appCluster, context);
jobs = flinkService.listJobs();
assertEquals(1, jobs.size());
@@ -404,7 +426,6 @@ public class FlinkDeploymentControllerTest {
flinkService.setPortReady(true);
testController.reconcile(appCluster, context);
testController.reconcile(appCluster, context);
-
if (appCluster.getSpec().getJob() != null) {
assertEquals("RUNNING",
appCluster.getStatus().getJobStatus().getState());
} else {
@@ -424,6 +445,7 @@ public class FlinkDeploymentControllerTest {
flinkService.setPortReady(true);
testController.reconcile(appCluster, context);
testController.reconcile(appCluster, context);
+ testController.reconcile(appCluster,
TestUtils.createEmptyContext());
assertEquals(
JobManagerDeploymentStatus.DEPLOYING,
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
index 691ad67..d201961 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
@@ -67,6 +67,11 @@ public class JobReconcilerTest {
reconciler.reconcile(statelessUpgrade, context, config);
runningJobs = flinkService.listJobs();
+ assertEquals(0, runningJobs.size());
+
+ reconciler.reconcile(statelessUpgrade, context, config);
+
+ runningJobs = flinkService.listJobs();
assertEquals(1, runningJobs.size());
assertNull(runningJobs.get(0).f0);
@@ -83,6 +88,11 @@ public class JobReconcilerTest {
reconciler.reconcile(statefulUpgrade, context, new
Configuration(config));
runningJobs = flinkService.listJobs();
+ assertEquals(0, runningJobs.size());
+
+ reconciler.reconcile(statefulUpgrade, context, new
Configuration(config));
+
+ runningJobs = flinkService.listJobs();
assertEquals(1, runningJobs.size());
assertEquals("savepoint_0", runningJobs.get(0).f0);
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
index 1558e12..76bd26a 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
@@ -17,16 +17,17 @@
package org.apache.flink.kubernetes.operator.utils;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import org.junit.jupiter.api.Test;
-import java.util.concurrent.TimeUnit;
-
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -35,26 +36,30 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
/** Test for {@link
org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils}. */
public class ReconciliationUtilsTest {
+ FlinkOperatorConfiguration operatorConfiguration =
+ FlinkOperatorConfiguration.fromConfiguration(new Configuration());
+
@Test
public void testSpecChangedException() {
FlinkDeployment previous = TestUtils.buildApplicationCluster();
- FlinkDeployment current =
-
org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils.clone(previous);
+ FlinkDeployment current = ReconciliationUtils.clone(previous);
current.getSpec().setImage("changed-image");
assertThrows(
UnsupportedOperationException.class,
- () -> ReconciliationUtils.toUpdateControl(previous, current));
+ () ->
+ ReconciliationUtils.toUpdateControl(
+ operatorConfiguration, previous, current,
true));
}
@Test
public void testStatusChanged() {
FlinkDeployment previous = TestUtils.buildApplicationCluster();
- FlinkDeployment current =
-
org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils.clone(previous);
+ FlinkDeployment current = ReconciliationUtils.clone(previous);
UpdateControl<FlinkDeployment> updateControl =
- ReconciliationUtils.toUpdateControl(previous, current);
+ ReconciliationUtils.toUpdateControl(
+ operatorConfiguration, previous, current, false);
assertFalse(updateControl.isUpdateResource());
assertFalse(updateControl.isUpdateStatus());
@@ -63,10 +68,29 @@ public class ReconciliationUtilsTest {
// status changed
current.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
updateControl =
- ReconciliationUtils.toUpdateControl(previous, current)
- .rescheduleAfter(10, TimeUnit.MILLISECONDS);
+ ReconciliationUtils.toUpdateControl(operatorConfiguration,
previous, current, true);
+ assertFalse(updateControl.isUpdateResource());
+ assertTrue(updateControl.isUpdateStatus());
+ assertEquals(
+ operatorConfiguration.getProgressCheckInterval().toMillis(),
+ updateControl.getScheduleDelay().get());
+ }
+
+ @Test
+ public void testRescheduleUpgradeImmediately() {
+ FlinkDeployment app = TestUtils.buildApplicationCluster();
+ app.getSpec().getJob().setState(JobState.RUNNING);
+ FlinkDeployment current = ReconciliationUtils.clone(app);
+ current.getStatus()
+ .getReconciliationStatus()
+
.setLastReconciledSpec(ReconciliationUtils.clone(current.getSpec()));
+ ReconciliationUtils.updateForSpecReconciliationSuccess(current,
JobState.SUSPENDED);
+
+ UpdateControl<FlinkDeployment> updateControl =
+ ReconciliationUtils.toUpdateControl(operatorConfiguration,
app, current, true);
+
assertFalse(updateControl.isUpdateResource());
assertTrue(updateControl.isUpdateStatus());
- assertEquals(10, updateControl.getScheduleDelay().get());
+ assertEquals(0, updateControl.getScheduleDelay().get());
}
}