Aitozi commented on code in PR #255: URL: https://github.com/apache/flink-kubernetes-operator/pull/255#discussion_r890243163
########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java: ########## @@ -69,96 +69,78 @@ public void observeSavepointStatus( .map(Savepoint::getLocation) .orElse(null); - observeTriggeredSavepointProgress(savepointInfo, jobId, deployedConfig) - .ifPresent( - err -> - EventUtils.createOrUpdateEvent( - flinkService.getKubernetesClient(), - resource, - EventUtils.Type.Warning, - "SavepointError", - SavepointUtils.createSavepointError( - savepointInfo, - resource.getSpec() - .getJob() - .getSavepointTriggerNonce()), - EventUtils.Component.Operator)); - - // We only need to observe latest checkpoint/savepoint for terminal jobs - if (JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState()) { - observeLatestSavepoint(savepointInfo, jobId, deployedConfig); + // If any manual or periodic savepoint is in progress, observe it + if (SavepointUtils.savepointInProgress(jobStatus)) { + observeTriggeredSavepoint(resource, jobId, deployedConfig); } - var currentLastSpPath = - Optional.ofNullable(savepointInfo.getLastSavepoint()) - .map(Savepoint::getLocation) - .orElse(null); - - // If the last savepoint information changes we need to patch the status - // to avoid losing this in case of an operator failure after the cluster was shut down - if (currentLastSpPath != null && !currentLastSpPath.equals(previousLastSpPath)) { - LOG.info( - "Updating resource status after observing new last savepoint {}", - currentLastSpPath); - statusHelper.patchAndCacheStatus(resource); + // If job is in globally terminal state, observe last savepoint + if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) { + observeLatestSavepoint(savepointInfo, jobId, deployedConfig); } + + patchStatusOnSavepointChange(resource, savepointInfo, previousLastSpPath); } /** * Observe the savepoint result based on the current savepoint info. * - * @param currentSavepointInfo the current savepoint info. + * @param resource the resource being observed * @param jobID the jobID of the observed job. * @param deployedConfig Deployed job config. * @return The observed error, if no error observed, {@code Optional.empty()} will be returned. */ - private Optional<String> observeTriggeredSavepointProgress( - SavepointInfo currentSavepointInfo, String jobID, Configuration deployedConfig) { - if (StringUtils.isEmpty(currentSavepointInfo.getTriggerId())) { - LOG.debug("Savepoint not in progress"); - return Optional.empty(); - } + private void observeTriggeredSavepoint( + AbstractFlinkResource<?, ?> resource, String jobID, Configuration deployedConfig) { + + var savepointInfo = resource.getStatus().getJobStatus().getSavepointInfo(); + LOG.info("Observing savepoint status."); - SavepointFetchResult savepointFetchResult = + var savepointFetchResult = flinkService.fetchSavepointInfo( - currentSavepointInfo.getTriggerId(), jobID, deployedConfig); + savepointInfo.getTriggerId(), jobID, deployedConfig); if (savepointFetchResult.isPending()) { Review Comment: If it is in pending, it will not check the grace period now, So the savepoint may take a longer time than the grace period time now ########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java: ########## @@ -69,96 +69,78 @@ public void observeSavepointStatus( .map(Savepoint::getLocation) .orElse(null); - observeTriggeredSavepointProgress(savepointInfo, jobId, deployedConfig) - .ifPresent( - err -> - EventUtils.createOrUpdateEvent( - flinkService.getKubernetesClient(), - resource, - EventUtils.Type.Warning, - "SavepointError", - SavepointUtils.createSavepointError( - savepointInfo, - resource.getSpec() - .getJob() - .getSavepointTriggerNonce()), - EventUtils.Component.Operator)); - - // We only need to observe latest checkpoint/savepoint for terminal jobs - if (JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState()) { - observeLatestSavepoint(savepointInfo, jobId, deployedConfig); + // If any manual or periodic savepoint is in progress, observe it + if (SavepointUtils.savepointInProgress(jobStatus)) { + observeTriggeredSavepoint(resource, jobId, deployedConfig); } - var currentLastSpPath = - Optional.ofNullable(savepointInfo.getLastSavepoint()) - .map(Savepoint::getLocation) - .orElse(null); - - // If the last savepoint information changes we need to patch the status - // to avoid losing this in case of an operator failure after the cluster was shut down - if (currentLastSpPath != null && !currentLastSpPath.equals(previousLastSpPath)) { - LOG.info( - "Updating resource status after observing new last savepoint {}", - currentLastSpPath); - statusHelper.patchAndCacheStatus(resource); + // If job is in globally terminal state, observe last savepoint + if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) { + observeLatestSavepoint(savepointInfo, jobId, deployedConfig); } + + patchStatusOnSavepointChange(resource, savepointInfo, previousLastSpPath); } /** * Observe the savepoint result based on the current savepoint info. * - * @param currentSavepointInfo the current savepoint info. + * @param resource the resource being observed * @param jobID the jobID of the observed job. * @param deployedConfig Deployed job config. * @return The observed error, if no error observed, {@code Optional.empty()} will be returned. Review Comment: no return value now ########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java: ########## @@ -69,96 +69,78 @@ public void observeSavepointStatus( .map(Savepoint::getLocation) .orElse(null); - observeTriggeredSavepointProgress(savepointInfo, jobId, deployedConfig) - .ifPresent( - err -> - EventUtils.createOrUpdateEvent( - flinkService.getKubernetesClient(), - resource, - EventUtils.Type.Warning, - "SavepointError", - SavepointUtils.createSavepointError( - savepointInfo, - resource.getSpec() - .getJob() - .getSavepointTriggerNonce()), - EventUtils.Component.Operator)); - - // We only need to observe latest checkpoint/savepoint for terminal jobs - if (JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState()) { - observeLatestSavepoint(savepointInfo, jobId, deployedConfig); + // If any manual or periodic savepoint is in progress, observe it + if (SavepointUtils.savepointInProgress(jobStatus)) { + observeTriggeredSavepoint(resource, jobId, deployedConfig); } - var currentLastSpPath = - Optional.ofNullable(savepointInfo.getLastSavepoint()) - .map(Savepoint::getLocation) - .orElse(null); - - // If the last savepoint information changes we need to patch the status - // to avoid losing this in case of an operator failure after the cluster was shut down - if (currentLastSpPath != null && !currentLastSpPath.equals(previousLastSpPath)) { - LOG.info( - "Updating resource status after observing new last savepoint {}", - currentLastSpPath); - statusHelper.patchAndCacheStatus(resource); + // If job is in globally terminal state, observe last savepoint + if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) { + observeLatestSavepoint(savepointInfo, jobId, deployedConfig); } + + patchStatusOnSavepointChange(resource, savepointInfo, previousLastSpPath); Review Comment: do we have to put this in a finally block to ensure this will always be done ? ########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java: ########## @@ -58,9 +58,11 @@ public class KubernetesOperatorConfigOptions { "The interval for observing status for in-progress operations such as deployment and savepoints."); public static final ConfigOption<Duration> OPERATOR_OBSERVER_SAVEPOINT_TRIGGER_GRACE_PERIOD = Review Comment: The variable name could also be refactor according. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org