Aitozi commented on code in PR #255:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/255#discussion_r890243163


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java:
##########
@@ -69,96 +69,78 @@ public void observeSavepointStatus(
                         .map(Savepoint::getLocation)
                         .orElse(null);
 
-        observeTriggeredSavepointProgress(savepointInfo, jobId, deployedConfig)
-                .ifPresent(
-                        err ->
-                                EventUtils.createOrUpdateEvent(
-                                        flinkService.getKubernetesClient(),
-                                        resource,
-                                        EventUtils.Type.Warning,
-                                        "SavepointError",
-                                        SavepointUtils.createSavepointError(
-                                                savepointInfo,
-                                                resource.getSpec()
-                                                        .getJob()
-                                                        
.getSavepointTriggerNonce()),
-                                        EventUtils.Component.Operator));
-
-        // We only need to observe latest checkpoint/savepoint for terminal 
jobs
-        if (JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState()) 
{
-            observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
+        // If any manual or periodic savepoint is in progress, observe it
+        if (SavepointUtils.savepointInProgress(jobStatus)) {
+            observeTriggeredSavepoint(resource, jobId, deployedConfig);
         }
 
-        var currentLastSpPath =
-                Optional.ofNullable(savepointInfo.getLastSavepoint())
-                        .map(Savepoint::getLocation)
-                        .orElse(null);
-
-        // If the last savepoint information changes we need to patch the 
status
-        // to avoid losing this in case of an operator failure after the 
cluster was shut down
-        if (currentLastSpPath != null && 
!currentLastSpPath.equals(previousLastSpPath)) {
-            LOG.info(
-                    "Updating resource status after observing new last 
savepoint {}",
-                    currentLastSpPath);
-            statusHelper.patchAndCacheStatus(resource);
+        // If job is in globally terminal state, observe last savepoint
+        if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) {
+            observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
         }
+
+        patchStatusOnSavepointChange(resource, savepointInfo, 
previousLastSpPath);
     }
 
     /**
      * Observe the savepoint result based on the current savepoint info.
      *
-     * @param currentSavepointInfo the current savepoint info.
+     * @param resource the resource being observed
      * @param jobID the jobID of the observed job.
      * @param deployedConfig Deployed job config.
      * @return The observed error, if no error observed, {@code 
Optional.empty()} will be returned.
      */
-    private Optional<String> observeTriggeredSavepointProgress(
-            SavepointInfo currentSavepointInfo, String jobID, Configuration 
deployedConfig) {
-        if (StringUtils.isEmpty(currentSavepointInfo.getTriggerId())) {
-            LOG.debug("Savepoint not in progress");
-            return Optional.empty();
-        }
+    private void observeTriggeredSavepoint(
+            AbstractFlinkResource<?, ?> resource, String jobID, Configuration 
deployedConfig) {
+
+        var savepointInfo = 
resource.getStatus().getJobStatus().getSavepointInfo();
+
         LOG.info("Observing savepoint status.");
-        SavepointFetchResult savepointFetchResult =
+        var savepointFetchResult =
                 flinkService.fetchSavepointInfo(
-                        currentSavepointInfo.getTriggerId(), jobID, 
deployedConfig);
+                        savepointInfo.getTriggerId(), jobID, deployedConfig);
 
         if (savepointFetchResult.isPending()) {

Review Comment:
   If it is in pending, it will not check the grace period now, So the 
savepoint may take a longer time than the grace period time now 



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java:
##########
@@ -69,96 +69,78 @@ public void observeSavepointStatus(
                         .map(Savepoint::getLocation)
                         .orElse(null);
 
-        observeTriggeredSavepointProgress(savepointInfo, jobId, deployedConfig)
-                .ifPresent(
-                        err ->
-                                EventUtils.createOrUpdateEvent(
-                                        flinkService.getKubernetesClient(),
-                                        resource,
-                                        EventUtils.Type.Warning,
-                                        "SavepointError",
-                                        SavepointUtils.createSavepointError(
-                                                savepointInfo,
-                                                resource.getSpec()
-                                                        .getJob()
-                                                        
.getSavepointTriggerNonce()),
-                                        EventUtils.Component.Operator));
-
-        // We only need to observe latest checkpoint/savepoint for terminal 
jobs
-        if (JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState()) 
{
-            observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
+        // If any manual or periodic savepoint is in progress, observe it
+        if (SavepointUtils.savepointInProgress(jobStatus)) {
+            observeTriggeredSavepoint(resource, jobId, deployedConfig);
         }
 
-        var currentLastSpPath =
-                Optional.ofNullable(savepointInfo.getLastSavepoint())
-                        .map(Savepoint::getLocation)
-                        .orElse(null);
-
-        // If the last savepoint information changes we need to patch the 
status
-        // to avoid losing this in case of an operator failure after the 
cluster was shut down
-        if (currentLastSpPath != null && 
!currentLastSpPath.equals(previousLastSpPath)) {
-            LOG.info(
-                    "Updating resource status after observing new last 
savepoint {}",
-                    currentLastSpPath);
-            statusHelper.patchAndCacheStatus(resource);
+        // If job is in globally terminal state, observe last savepoint
+        if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) {
+            observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
         }
+
+        patchStatusOnSavepointChange(resource, savepointInfo, 
previousLastSpPath);
     }
 
     /**
      * Observe the savepoint result based on the current savepoint info.
      *
-     * @param currentSavepointInfo the current savepoint info.
+     * @param resource the resource being observed
      * @param jobID the jobID of the observed job.
      * @param deployedConfig Deployed job config.
      * @return The observed error, if no error observed, {@code 
Optional.empty()} will be returned.

Review Comment:
   no return value now



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java:
##########
@@ -69,96 +69,78 @@ public void observeSavepointStatus(
                         .map(Savepoint::getLocation)
                         .orElse(null);
 
-        observeTriggeredSavepointProgress(savepointInfo, jobId, deployedConfig)
-                .ifPresent(
-                        err ->
-                                EventUtils.createOrUpdateEvent(
-                                        flinkService.getKubernetesClient(),
-                                        resource,
-                                        EventUtils.Type.Warning,
-                                        "SavepointError",
-                                        SavepointUtils.createSavepointError(
-                                                savepointInfo,
-                                                resource.getSpec()
-                                                        .getJob()
-                                                        
.getSavepointTriggerNonce()),
-                                        EventUtils.Component.Operator));
-
-        // We only need to observe latest checkpoint/savepoint for terminal 
jobs
-        if (JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState()) 
{
-            observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
+        // If any manual or periodic savepoint is in progress, observe it
+        if (SavepointUtils.savepointInProgress(jobStatus)) {
+            observeTriggeredSavepoint(resource, jobId, deployedConfig);
         }
 
-        var currentLastSpPath =
-                Optional.ofNullable(savepointInfo.getLastSavepoint())
-                        .map(Savepoint::getLocation)
-                        .orElse(null);
-
-        // If the last savepoint information changes we need to patch the 
status
-        // to avoid losing this in case of an operator failure after the 
cluster was shut down
-        if (currentLastSpPath != null && 
!currentLastSpPath.equals(previousLastSpPath)) {
-            LOG.info(
-                    "Updating resource status after observing new last 
savepoint {}",
-                    currentLastSpPath);
-            statusHelper.patchAndCacheStatus(resource);
+        // If job is in globally terminal state, observe last savepoint
+        if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) {
+            observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
         }
+
+        patchStatusOnSavepointChange(resource, savepointInfo, 
previousLastSpPath);

Review Comment:
   do we have to put this in a finally block to ensure this will always be done 
?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -58,9 +58,11 @@ public class KubernetesOperatorConfigOptions {
                             "The interval for observing status for in-progress 
operations such as deployment and savepoints.");
 
     public static final ConfigOption<Duration> 
OPERATOR_OBSERVER_SAVEPOINT_TRIGGER_GRACE_PERIOD =

Review Comment:
   The variable name could also be refactor according.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to