Aitozi commented on code in PR #255:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/255#discussion_r890269122


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java:
##########
@@ -69,96 +69,78 @@ public void observeSavepointStatus(
                         .map(Savepoint::getLocation)
                         .orElse(null);
 
-        observeTriggeredSavepointProgress(savepointInfo, jobId, deployedConfig)
-                .ifPresent(
-                        err ->
-                                EventUtils.createOrUpdateEvent(
-                                        flinkService.getKubernetesClient(),
-                                        resource,
-                                        EventUtils.Type.Warning,
-                                        "SavepointError",
-                                        SavepointUtils.createSavepointError(
-                                                savepointInfo,
-                                                resource.getSpec()
-                                                        .getJob()
-                                                        
.getSavepointTriggerNonce()),
-                                        EventUtils.Component.Operator));
-
-        // We only need to observe latest checkpoint/savepoint for terminal 
jobs
-        if (JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState()) 
{
-            observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
+        // If any manual or periodic savepoint is in progress, observe it
+        if (SavepointUtils.savepointInProgress(jobStatus)) {
+            observeTriggeredSavepoint(resource, jobId, deployedConfig);
         }
 
-        var currentLastSpPath =
-                Optional.ofNullable(savepointInfo.getLastSavepoint())
-                        .map(Savepoint::getLocation)
-                        .orElse(null);
-
-        // If the last savepoint information changes we need to patch the 
status
-        // to avoid losing this in case of an operator failure after the 
cluster was shut down
-        if (currentLastSpPath != null && 
!currentLastSpPath.equals(previousLastSpPath)) {
-            LOG.info(
-                    "Updating resource status after observing new last 
savepoint {}",
-                    currentLastSpPath);
-            statusHelper.patchAndCacheStatus(resource);
+        // If job is in globally terminal state, observe last savepoint
+        if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) {
+            observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
         }
+
+        patchStatusOnSavepointChange(resource, savepointInfo, 
previousLastSpPath);
     }
 
     /**
      * Observe the savepoint result based on the current savepoint info.
      *
-     * @param currentSavepointInfo the current savepoint info.
+     * @param resource the resource being observed
      * @param jobID the jobID of the observed job.
      * @param deployedConfig Deployed job config.
      * @return The observed error, if no error observed, {@code 
Optional.empty()} will be returned.
      */
-    private Optional<String> observeTriggeredSavepointProgress(
-            SavepointInfo currentSavepointInfo, String jobID, Configuration 
deployedConfig) {
-        if (StringUtils.isEmpty(currentSavepointInfo.getTriggerId())) {
-            LOG.debug("Savepoint not in progress");
-            return Optional.empty();
-        }
+    private void observeTriggeredSavepoint(
+            AbstractFlinkResource<?, ?> resource, String jobID, Configuration 
deployedConfig) {
+
+        var savepointInfo = 
resource.getStatus().getJobStatus().getSavepointInfo();
+
         LOG.info("Observing savepoint status.");
-        SavepointFetchResult savepointFetchResult =
+        var savepointFetchResult =
                 flinkService.fetchSavepointInfo(
-                        currentSavepointInfo.getTriggerId(), jobID, 
deployedConfig);
+                        savepointInfo.getTriggerId(), jobID, deployedConfig);
 
         if (savepointFetchResult.isPending()) {
-            if (SavepointUtils.gracePeriodEnded(
-                    configManager.getOperatorConfiguration(), 
currentSavepointInfo)) {
-                String errorMsg =
-                        "Savepoint operation timed out after "
-                                + configManager
-                                        .getOperatorConfiguration()
-                                        .getSavepointTriggerGracePeriod();
-                currentSavepointInfo.resetTrigger();
-                LOG.error(errorMsg);
-                return Optional.of(errorMsg);
-            } else {
-                LOG.info("Savepoint operation not finished yet, waiting within 
grace period...");
-                return Optional.empty();
-            }
+            LOG.info("Savepoint operation not finished yet...");
+            return;
         }
 
         if (savepointFetchResult.getError() != null) {
-            currentSavepointInfo.resetTrigger();
-            return Optional.of(savepointFetchResult.getError());
+            var err = savepointFetchResult.getError();
+            if (SavepointUtils.gracePeriodEnded(deployedConfig, 
savepointInfo)) {
+                LOG.error(
+                        "Savepoint attempt failed after grace period. Won't be 
retried again: "
+                                + err);
+                
ReconciliationUtils.updateLastReconciledSavepointTrigger(savepointInfo, 
resource);

Review Comment:
   The function name `updateLastReconciledSavepointTrigger ` is a bit confused. 
It looks like a semantic of `triggerNonceFinished` (no matter success or 
failed). And it do not have to touch the common status's error I think 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to