This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 4f2f0d3 [FLINK-27257] Retry failed savepoints within grace period 4f2f0d3 is described below commit 4f2f0d392959f70bbb7b012403f1d6c9c16477cc Author: Gyula Fora <g_f...@apple.com> AuthorDate: Tue Jun 7 11:12:48 2022 +0200 [FLINK-27257] Retry failed savepoints within grace period --- .../kubernetes_operator_config_configuration.html | 12 +- .../config/FlinkOperatorConfiguration.java | 7 -- .../config/KubernetesOperatorConfigOptions.java | 8 +- .../operator/observer/SavepointObserver.java | 138 +++++++++++---------- .../operator/reconciler/ReconciliationUtils.java | 17 ++- .../kubernetes/operator/utils/SavepointUtils.java | 15 ++- .../operator/observer/SavepointObserverTest.java | 6 +- .../deployment/ApplicationObserverTest.java | 29 ++++- .../deployment/ApplicationReconcilerTest.java | 24 +++- .../sessionjob/FlinkSessionJobReconcilerTest.java | 17 +-- 10 files changed, 165 insertions(+), 108 deletions(-) diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html index 5e9b466..08c72d0 100644 --- a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html +++ b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html @@ -68,12 +68,6 @@ <td>Duration</td> <td>Final delay before deployment is marked ready after port becomes accessible.</td> </tr> - <tr> - <td><h5>kubernetes.operator.observer.savepoint.trigger.grace-period</h5></td> - <td style="word-wrap: break-word;">10 s</td> - <td>Duration</td> - <td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td> - </tr> <tr> <td><h5>kubernetes.operator.periodic.savepoint.interval</h5></td> <td style="word-wrap: break-word;">0 ms</td> @@ -122,6 +116,12 @@ <td>Integer</td> <td>Maximum number of savepoint history entries to retain.</td> </tr> + <tr> + <td><h5>kubernetes.operator.savepoint.trigger.grace-period</h5></td> + <td style="word-wrap: break-word;">1 min</td> + <td>Duration</td> + <td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td> + </tr> <tr> <td><h5>kubernetes.operator.user.artifacts.base.dir</h5></td> <td style="word-wrap: break-word;">"/opt/flink/artifacts"</td> diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java index 69a3ef8..36b9383 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java @@ -34,7 +34,6 @@ public class FlinkOperatorConfiguration { int reconcilerMaxParallelism; Duration progressCheckInterval; Duration restApiReadyDelay; - Duration savepointTriggerGracePeriod; Duration flinkClientTimeout; String flinkServiceHostOverride; Set<String> watchedNamespaces; @@ -62,11 +61,6 @@ public class FlinkOperatorConfiguration { operatorConfig.get( KubernetesOperatorConfigOptions.OPERATOR_OBSERVER_PROGRESS_CHECK_INTERVAL); - Duration savepointTriggerGracePeriod = - operatorConfig.get( - KubernetesOperatorConfigOptions - .OPERATOR_OBSERVER_SAVEPOINT_TRIGGER_GRACE_PERIOD); - Duration flinkClientTimeout = operatorConfig.get( KubernetesOperatorConfigOptions.OPERATOR_OBSERVER_FLINK_CLIENT_TIMEOUT); @@ -103,7 +97,6 @@ public class FlinkOperatorConfiguration { reconcilerMaxParallelism, progressCheckInterval, restApiReadyDelay, - savepointTriggerGracePeriod, flinkClientTimeout, flinkServiceHostOverride, watchedNamespaces, diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java index f5eaade..b8e03b2 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java @@ -57,10 +57,12 @@ public class KubernetesOperatorConfigOptions { .withDescription( "The interval for observing status for in-progress operations such as deployment and savepoints."); - public static final ConfigOption<Duration> OPERATOR_OBSERVER_SAVEPOINT_TRIGGER_GRACE_PERIOD = - ConfigOptions.key("kubernetes.operator.observer.savepoint.trigger.grace-period") + public static final ConfigOption<Duration> OPERATOR_SAVEPOINT_TRIGGER_GRACE_PERIOD = + ConfigOptions.key("kubernetes.operator.savepoint.trigger.grace-period") .durationType() - .defaultValue(Duration.ofSeconds(10)) + .defaultValue(Duration.ofMinutes(1)) + .withDeprecatedKeys( + "kubernetes.operator.observer.savepoint.trigger.grace-period") .withDescription( "The interval before a savepoint trigger attempt is marked as unsuccessful."); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java index a0b5788..dc129c7 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java @@ -19,7 +19,6 @@ package org.apache.flink.kubernetes.operator.observer; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource; @@ -27,12 +26,12 @@ import org.apache.flink.kubernetes.operator.crd.status.CommonStatus; import org.apache.flink.kubernetes.operator.crd.status.Savepoint; import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo; import org.apache.flink.kubernetes.operator.exception.ReconciliationException; +import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.kubernetes.operator.utils.EventUtils; import org.apache.flink.kubernetes.operator.utils.SavepointUtils; import org.apache.flink.kubernetes.operator.utils.StatusHelper; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +60,7 @@ public class SavepointObserver<STATUS extends CommonStatus<?>> { public void observeSavepointStatus( AbstractFlinkResource<?, STATUS> resource, Configuration deployedConfig) { + var jobStatus = resource.getStatus().getJobStatus(); var savepointInfo = jobStatus.getSavepointInfo(); var jobId = jobStatus.getJobId(); @@ -69,96 +69,79 @@ public class SavepointObserver<STATUS extends CommonStatus<?>> { .map(Savepoint::getLocation) .orElse(null); - observeTriggeredSavepointProgress(savepointInfo, jobId, deployedConfig) - .ifPresent( - err -> - EventUtils.createOrUpdateEvent( - flinkService.getKubernetesClient(), - resource, - EventUtils.Type.Warning, - "SavepointError", - SavepointUtils.createSavepointError( - savepointInfo, - resource.getSpec() - .getJob() - .getSavepointTriggerNonce()), - EventUtils.Component.Operator)); - - // We only need to observe latest checkpoint/savepoint for terminal jobs - if (JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState()) { - observeLatestSavepoint(savepointInfo, jobId, deployedConfig); + // If any manual or periodic savepoint is in progress, observe it + if (SavepointUtils.savepointInProgress(jobStatus)) { + observeTriggeredSavepoint(resource, jobId, deployedConfig); } - var currentLastSpPath = - Optional.ofNullable(savepointInfo.getLastSavepoint()) - .map(Savepoint::getLocation) - .orElse(null); - - // If the last savepoint information changes we need to patch the status - // to avoid losing this in case of an operator failure after the cluster was shut down - if (currentLastSpPath != null && !currentLastSpPath.equals(previousLastSpPath)) { - LOG.info( - "Updating resource status after observing new last savepoint {}", - currentLastSpPath); - statusHelper.patchAndCacheStatus(resource); + // If job is in globally terminal state, observe last savepoint + if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) { + observeLatestSavepoint(savepointInfo, jobId, deployedConfig); } + + patchStatusOnSavepointChange(resource, savepointInfo, previousLastSpPath); } /** - * Observe the savepoint result based on the current savepoint info. + * Observe the status of manually triggered savepoints. * - * @param currentSavepointInfo the current savepoint info. + * @param resource the resource being observed * @param jobID the jobID of the observed job. * @param deployedConfig Deployed job config. - * @return The observed error, if no error observed, {@code Optional.empty()} will be returned. */ - private Optional<String> observeTriggeredSavepointProgress( - SavepointInfo currentSavepointInfo, String jobID, Configuration deployedConfig) { - if (StringUtils.isEmpty(currentSavepointInfo.getTriggerId())) { - LOG.debug("Savepoint not in progress"); - return Optional.empty(); - } + private void observeTriggeredSavepoint( + AbstractFlinkResource<?, ?> resource, String jobID, Configuration deployedConfig) { + + var savepointInfo = resource.getStatus().getJobStatus().getSavepointInfo(); + LOG.info("Observing savepoint status."); - SavepointFetchResult savepointFetchResult = + var savepointFetchResult = flinkService.fetchSavepointInfo( - currentSavepointInfo.getTriggerId(), jobID, deployedConfig); + savepointInfo.getTriggerId(), jobID, deployedConfig); if (savepointFetchResult.isPending()) { - if (SavepointUtils.gracePeriodEnded( - configManager.getOperatorConfiguration(), currentSavepointInfo)) { - String errorMsg = - "Savepoint operation timed out after " - + configManager - .getOperatorConfiguration() - .getSavepointTriggerGracePeriod(); - currentSavepointInfo.resetTrigger(); - LOG.error(errorMsg); - return Optional.of(errorMsg); - } else { - LOG.info("Savepoint operation not finished yet, waiting within grace period..."); - return Optional.empty(); - } + LOG.info("Savepoint operation not finished yet..."); + return; } if (savepointFetchResult.getError() != null) { - currentSavepointInfo.resetTrigger(); - return Optional.of(savepointFetchResult.getError()); + var err = savepointFetchResult.getError(); + if (SavepointUtils.gracePeriodEnded(deployedConfig, savepointInfo)) { + LOG.error( + "Savepoint attempt failed after grace period. Won't be retried again: " + + err); + ReconciliationUtils.updateLastReconciledSavepointTriggerNonce( + savepointInfo, resource); + EventUtils.createOrUpdateEvent( + flinkService.getKubernetesClient(), + resource, + EventUtils.Type.Warning, + "SavepointError", + SavepointUtils.createSavepointError( + savepointInfo, + resource.getSpec().getJob().getSavepointTriggerNonce()), + EventUtils.Component.Operator); + } else { + LOG.warn("Savepoint failed within grace period, retrying: " + err); + } + savepointInfo.resetTrigger(); + return; } - LOG.info("Savepoint status updated with latest completed savepoint info"); var savepoint = new Savepoint( - currentSavepointInfo.getTriggerTimestamp(), + savepointInfo.getTriggerTimestamp(), savepointFetchResult.getLocation(), - currentSavepointInfo.getTriggerType()); - currentSavepointInfo.updateLastSavepoint(savepoint); + savepointInfo.getTriggerType()); - updateSavepointHistory(currentSavepointInfo, savepoint, deployedConfig); - return Optional.empty(); + ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(savepointInfo, resource); + savepointInfo.updateLastSavepoint(savepoint); + cleanupSavepointHistory(savepointInfo, savepoint, deployedConfig); } + /** Clean up and dispose savepoints according to the configured max size/age. */ @VisibleForTesting - void updateSavepointHistory( + void cleanupSavepointHistory( SavepointInfo currentSavepointInfo, Savepoint newSavepoint, Configuration deployedConfig) { @@ -204,4 +187,27 @@ public class SavepointObserver<STATUS extends CommonStatus<?>> { throw new ReconciliationException(e); } } + + /** + * Patch the Kubernetes Flink resource status if we observed a new last savepoint. This is + * crucial to not lose this information once the reconciler shuts down the cluster. + */ + private void patchStatusOnSavepointChange( + AbstractFlinkResource<?, STATUS> resource, + SavepointInfo savepointInfo, + String previousLastSpPath) { + var currentLastSpPath = + Optional.ofNullable(savepointInfo.getLastSavepoint()) + .map(Savepoint::getLocation) + .orElse(null); + + // If the last savepoint information changes we need to patch the status + // to avoid losing this in case of an operator failure after the cluster was shut down + if (currentLastSpPath != null && !currentLastSpPath.equals(previousLastSpPath)) { + LOG.info( + "Updating resource status after observing new last savepoint {}", + currentLastSpPath); + statusHelper.patchAndCacheStatus(resource); + } + } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java index 4a21170..cd28a57 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java @@ -33,6 +33,8 @@ import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus; import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState; import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus; +import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo; +import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType; import org.apache.flink.kubernetes.operator.exception.ReconciliationException; import org.apache.flink.kubernetes.operator.metrics.MetricManager; import org.apache.flink.kubernetes.operator.service.FlinkService; @@ -95,13 +97,18 @@ public class ReconciliationUtils { } } - public static <SPEC extends AbstractFlinkSpec> void updateSavepointReconciliationSuccess( - AbstractFlinkResource<SPEC, ?> target) { + public static <SPEC extends AbstractFlinkSpec> void updateLastReconciledSavepointTriggerNonce( + SavepointInfo savepointInfo, AbstractFlinkResource<SPEC, ?> target) { + + // We only need to update for MANUAL triggers + if (savepointInfo.getTriggerType() != SavepointTriggerType.MANUAL) { + return; + } + var commonStatus = target.getStatus(); var spec = target.getSpec(); - ReconciliationStatus<SPEC> reconciliationStatus = commonStatus.getReconciliationStatus(); - commonStatus.setError(null); - SPEC lastReconciledSpec = reconciliationStatus.deserializeLastReconciledSpec(); + var reconciliationStatus = commonStatus.getReconciliationStatus(); + var lastReconciledSpec = reconciliationStatus.deserializeLastReconciledSpec(); lastReconciledSpec .getJob() .setSavepointTriggerNonce(spec.getJob().getSavepointTriggerNonce()); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java index c4ca34e..6858281 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java @@ -19,7 +19,6 @@ package org.apache.flink.kubernetes.operator.utils; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.crd.status.JobStatus; @@ -70,9 +69,6 @@ public class SavepointUtils { resource.getStatus().getJobStatus().getSavepointInfo(), conf); - if (triggerType == SavepointTriggerType.MANUAL) { - ReconciliationUtils.updateSavepointReconciliationSuccess(resource); - } return true; } @@ -135,10 +131,12 @@ public class SavepointUtils { return Optional.empty(); } - public static boolean gracePeriodEnded( - FlinkOperatorConfiguration configuration, SavepointInfo savepointInfo) { - var elapsed = System.currentTimeMillis() - savepointInfo.getTriggerTimestamp(); - return elapsed > configuration.getSavepointTriggerGracePeriod().toMillis(); + public static boolean gracePeriodEnded(Configuration conf, SavepointInfo savepointInfo) { + Duration gracePeriod = + conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_TRIGGER_GRACE_PERIOD); + var endOfGracePeriod = + Instant.ofEpochMilli(savepointInfo.getTriggerTimestamp()).plus(gracePeriod); + return endOfGracePeriod.isBefore(Instant.now()); } public static void resetTriggerIfJobNotRunning( @@ -148,6 +146,7 @@ public class SavepointUtils { if (!ReconciliationUtils.isJobRunning(status) && SavepointUtils.savepointInProgress(jobStatus)) { var savepointInfo = jobStatus.getSavepointInfo(); + ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(savepointInfo, resource); savepointInfo.resetTrigger(); LOG.error("Job is not running, cancelling savepoint operation"); EventUtils.createOrUpdateEvent( diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java index e3ff8d0..ef335b5 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java @@ -53,7 +53,7 @@ public class SavepointObserverTest { Savepoint sp = new Savepoint(1, "sp1", SavepointTriggerType.MANUAL); spInfo.updateLastSavepoint(sp); - observer.updateSavepointHistory(spInfo, sp, configManager.getDefaultConfig()); + observer.cleanupSavepointHistory(spInfo, sp, configManager.getDefaultConfig()); Assertions.assertNotNull(spInfo.getSavepointHistory()); Assertions.assertIterableEquals( @@ -73,7 +73,7 @@ public class SavepointObserverTest { Savepoint sp1 = new Savepoint(1, "sp1", SavepointTriggerType.MANUAL); spInfo.updateLastSavepoint(sp1); - observer.updateSavepointHistory(spInfo, sp1, conf); + observer.cleanupSavepointHistory(spInfo, sp1, conf); Assertions.assertIterableEquals( Collections.singletonList(sp1), spInfo.getSavepointHistory()); Assertions.assertIterableEquals( @@ -81,7 +81,7 @@ public class SavepointObserverTest { Savepoint sp2 = new Savepoint(2, "sp2", SavepointTriggerType.MANUAL); spInfo.updateLastSavepoint(sp2); - observer.updateSavepointHistory(spInfo, sp2, conf); + observer.cleanupSavepointHistory(spInfo, sp2, conf); Assertions.assertIterableEquals( Collections.singletonList(sp2), spInfo.getSavepointHistory()); Assertions.assertIterableEquals( diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java index 3963b97..104801c 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java @@ -39,6 +39,9 @@ import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import io.javaoperatorsdk.operator.api.reconciler.Context; import org.junit.jupiter.api.Test; +import java.time.Duration; +import java.time.Instant; + import static org.junit.Assert.assertFalse; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -185,7 +188,7 @@ public class ApplicationObserverTest { assertTrue(SavepointUtils.savepointInProgress(deployment.getStatus().getJobStatus())); deployment.getStatus().getJobStatus().getSavepointInfo().setTriggerId("unknown"); - // savepoint error + // savepoint error within grace period assertEquals( 0, kubernetesClient @@ -197,6 +200,30 @@ public class ApplicationObserverTest { .size()); observer.observe(deployment, readyContext); assertFalse(SavepointUtils.savepointInProgress(deployment.getStatus().getJobStatus())); + assertEquals( + 0, + kubernetesClient + .v1() + .events() + .inNamespace(deployment.getMetadata().getNamespace()) + .list() + .getItems() + .size()); + + deployment.getStatus().getJobStatus().getSavepointInfo().setTriggerId("unknown"); + deployment + .getStatus() + .getJobStatus() + .getSavepointInfo() + .setTriggerType(SavepointTriggerType.MANUAL); + deployment + .getStatus() + .getJobStatus() + .getSavepointInfo() + .setTriggerTimestamp(Instant.now().minus(Duration.ofHours(1)).toEpochMilli()); + + observer.observe(deployment, readyContext); + assertFalse(SavepointUtils.savepointInProgress(deployment.getStatus().getJobStatus())); assertEquals( 1, kubernetesClient diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java index f631141..81978fb 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java @@ -222,7 +222,6 @@ public class ApplicationReconcilerTest { verifyAndSetRunningJobsToStatus(deployment, runningJobs); assertFalse(SavepointUtils.savepointInProgress(deployment.getStatus().getJobStatus())); - // trigger savepoint FlinkDeployment spDeployment = ReconciliationUtils.clone(deployment); // don't trigger if nonce is missing @@ -235,6 +234,13 @@ public class ApplicationReconcilerTest { .getJob() .setSavepointTriggerNonce(ThreadLocalRandom.current().nextLong()); reconciler.reconcile(spDeployment, context); + assertNull( + spDeployment + .getStatus() + .getReconciliationStatus() + .deserializeLastReconciledSpec() + .getJob() + .getSavepointTriggerNonce()); assertEquals( "trigger_0", spDeployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId()); @@ -250,12 +256,16 @@ public class ApplicationReconcilerTest { spDeployment.getStatus().getJobStatus().getSavepointInfo().getTriggerType()); spDeployment.getStatus().getJobStatus().getSavepointInfo().resetTrigger(); + ReconciliationUtils.updateLastReconciledSavepointTriggerNonce( + spDeployment.getStatus().getJobStatus().getSavepointInfo(), spDeployment); // don't trigger when nonce is the same reconciler.reconcile(spDeployment, context); assertFalse(SavepointUtils.savepointInProgress(spDeployment.getStatus().getJobStatus())); spDeployment.getStatus().getJobStatus().getSavepointInfo().resetTrigger(); + ReconciliationUtils.updateLastReconciledSavepointTriggerNonce( + spDeployment.getStatus().getJobStatus().getSavepointInfo(), spDeployment); // trigger when new nonce is defined spDeployment @@ -270,7 +280,19 @@ public class ApplicationReconcilerTest { SavepointTriggerType.MANUAL, spDeployment.getStatus().getJobStatus().getSavepointInfo().getTriggerType()); + // re-trigger after reset + spDeployment.getStatus().getJobStatus().getSavepointInfo().resetTrigger(); + reconciler.reconcile(spDeployment, context); + assertEquals( + "trigger_2", + spDeployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId()); + assertEquals( + SavepointTriggerType.MANUAL, + spDeployment.getStatus().getJobStatus().getSavepointInfo().getTriggerType()); + spDeployment.getStatus().getJobStatus().getSavepointInfo().resetTrigger(); + ReconciliationUtils.updateLastReconciledSavepointTriggerNonce( + spDeployment.getStatus().getJobStatus().getSavepointInfo(), spDeployment); // don't trigger nonce is cleared spDeployment.getSpec().getJob().setSavepointTriggerNonce(null); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconcilerTest.java index fd96602..8167292 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconcilerTest.java @@ -41,6 +41,7 @@ import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -278,8 +279,7 @@ public class FlinkSessionJobReconcilerTest { assertTrue(SavepointUtils.savepointInProgress(sp1SessionJob.getStatus().getJobStatus())); // the last reconcile nonce updated - assertEquals( - 2L, + assertNull( sp1SessionJob .getStatus() .getReconciliationStatus() @@ -323,6 +323,8 @@ public class FlinkSessionJobReconcilerTest { .getParallelism()); sp1SessionJob.getStatus().getJobStatus().getSavepointInfo().resetTrigger(); + ReconciliationUtils.updateLastReconciledSavepointTriggerNonce( + sp1SessionJob.getStatus().getJobStatus().getSavepointInfo(), sp1SessionJob); // running -> suspended reconciler.reconcile(sp1SessionJob, readyContext); @@ -345,20 +347,19 @@ public class FlinkSessionJobReconcilerTest { flinkService.listSessionJobs()); sp1SessionJob.getStatus().getJobStatus().getSavepointInfo().resetTrigger(); - - // don't trigger when nonce is the same - sp1SessionJob.getSpec().getJob().setSavepointTriggerNonce(2L); - reconciler.reconcile(sp1SessionJob, readyContext); - assertFalse(SavepointUtils.savepointInProgress(sp1SessionJob.getStatus().getJobStatus())); + ReconciliationUtils.updateLastReconciledSavepointTriggerNonce( + sp1SessionJob.getStatus().getJobStatus().getSavepointInfo(), sp1SessionJob); // trigger when new nonce is defined - sp1SessionJob.getSpec().getJob().setSavepointTriggerNonce(3L); + sp1SessionJob.getSpec().getJob().setSavepointTriggerNonce(4L); reconciler.reconcile(sp1SessionJob, readyContext); assertEquals( "trigger_1", sp1SessionJob.getStatus().getJobStatus().getSavepointInfo().getTriggerId()); sp1SessionJob.getStatus().getJobStatus().getSavepointInfo().resetTrigger(); + ReconciliationUtils.updateLastReconciledSavepointTriggerNonce( + sp1SessionJob.getStatus().getJobStatus().getSavepointInfo(), sp1SessionJob); // don't trigger when nonce is cleared sp1SessionJob.getSpec().getJob().setSavepointTriggerNonce(null);