gyfora commented on code in PR #249:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/249#discussion_r886323405


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java:
##########
@@ -17,40 +17,146 @@
 
 package org.apache.flink.kubernetes.operator.utils;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
-import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
+import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
 
+import io.fabric8.kubernetes.client.KubernetesClient;
 import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
 
 /** Savepoint utilities. */
 public class SavepointUtils {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(SavepointUtils.class);
+
     public static boolean savepointInProgress(JobStatus jobStatus) {
         return 
StringUtils.isNotEmpty(jobStatus.getSavepointInfo().getTriggerId());
     }
 
-    public static boolean shouldTriggerSavepoint(JobSpec jobSpec, 
FlinkDeploymentStatus status) {
-        if (savepointInProgress(status.getJobStatus())) {
+    /**
+     * Triggers any pending manual or periodic savepoints and updates the 
status accordingly.
+     *
+     * @param flinkService {@link FlinkService} used to trigger savepoints
+     * @param resource Resource that should be savepointed
+     * @param conf Observe config of the resource
+     * @return True if a savepoint was triggered
+     */
+    public static boolean triggerSavepointIfNeeded(
+            FlinkService flinkService, AbstractFlinkResource<?, ?> resource, 
Configuration conf)
+            throws Exception {
+
+        Optional<SavepointTriggerType> triggerOpt = 
shouldTriggerSavepoint(resource, conf);
+        if (triggerOpt.isEmpty()) {
             return false;
         }
-        return jobSpec.getSavepointTriggerNonce() != null
-                && !jobSpec.getSavepointTriggerNonce()
-                        .equals(
-                                status.getReconciliationStatus()
-                                        .deserializeLastReconciledSpec()
-                                        .getJob()
-                                        .getSavepointTriggerNonce());
+
+        var triggerType = triggerOpt.get();
+        flinkService.triggerSavepoint(
+                resource.getStatus().getJobStatus().getJobId(),
+                triggerType,
+                resource.getStatus().getJobStatus().getSavepointInfo(),
+                conf);
+
+        if (triggerType == SavepointTriggerType.MANUAL) {
+            ReconciliationUtils.updateSavepointReconciliationSuccess(resource);
+        }
+        return true;
+    }
+
+    /**
+     * Checks whether savepoint should be triggered based on the current 
status and spec and if yes,
+     * returns the correct {@link SavepointTriggerType}.
+     *
+     * <p>This logic is responsible for both manual and periodic savepoint 
triggering.
+     *
+     * @param resource Resource to be savepointed
+     * @param conf Observe configuration of the resource
+     * @return Optional @{@link SavepointTriggerType}
+     */
+    @VisibleForTesting
+    protected static Optional<SavepointTriggerType> shouldTriggerSavepoint(
+            AbstractFlinkResource<?, ?> resource, Configuration conf) {
+
+        var status = resource.getStatus();
+        var jobSpec = resource.getSpec().getJob();
+        var jobStatus = status.getJobStatus();
+
+        if (!ReconciliationUtils.isJobRunning(status) || 
savepointInProgress(jobStatus)) {
+            return Optional.empty();
+        }
+
+        var triggerNonceChanged =
+                jobSpec.getSavepointTriggerNonce() != null
+                        && !jobSpec.getSavepointTriggerNonce()
+                                .equals(
+                                        status.getReconciliationStatus()
+                                                
.deserializeLastReconciledSpec()
+                                                .getJob()
+                                                .getSavepointTriggerNonce());
+        if (triggerNonceChanged) {
+            return Optional.of(SavepointTriggerType.MANUAL);
+        }
+
+        var savepointInterval =
+                
conf.getOptional(KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL)
+                        .map(Duration::toMillis)
+                        .orElse(Long.MAX_VALUE);
+
+        var lastPeriodicTriggerTs =
+                
jobStatus.getSavepointInfo().getLastPeriodicSavepointTimestamp();
+
+        // When the resource is first created/periodic savepointing enabled we 
have to compare
+        // against the creation timestamp for triggering the first periodic 
savepoint
+        if (lastPeriodicTriggerTs.equals(0L)) {
+            lastPeriodicTriggerTs =
+                    
Instant.parse(resource.getMetadata().getCreationTimestamp()).toEpochMilli();
+        }
+
+        long timeElapsed = System.currentTimeMillis() - lastPeriodicTriggerTs;
+        if (timeElapsed >= savepointInterval) {
+            LOG.info(
+                    "Triggering new periodic savepoint after {} seconds",
+                    Duration.ofMillis(timeElapsed).toSeconds());
+            return Optional.of(SavepointTriggerType.PERIODIC);
+        }
+        return Optional.empty();
     }
 
     public static boolean gracePeriodEnded(
             FlinkOperatorConfiguration configuration, SavepointInfo 
savepointInfo) {
-        Duration gracePeriod = configuration.getSavepointTriggerGracePeriod();
-        long triggerTimestamp = savepointInfo.getTriggerTimestamp();
-        return (System.currentTimeMillis() - triggerTimestamp) > 
gracePeriod.toMillis();
+        var elapsed = System.currentTimeMillis() - 
savepointInfo.getTriggerTimestamp();
+        return elapsed > 
configuration.getSavepointTriggerGracePeriod().toMillis();
+    }
+
+    public static void resetTriggerIfJobNotRunning(
+            KubernetesClient client, AbstractFlinkResource<?, ?> resource) {
+        var status = resource.getStatus();
+        var jobStatus = status.getJobStatus();
+        if (!ReconciliationUtils.isJobRunning(status)
+                && SavepointUtils.savepointInProgress(jobStatus)) {
+            jobStatus.getSavepointInfo().resetTrigger();
+            LOG.error("Job is not running, cancelling savepoint operation");
+            EventUtils.createOrUpdateEvent(
+                    client,
+                    resource,
+                    EventUtils.Type.Warning,
+                    "SavepointError",
+                    "Savepoint failed for savepointTriggerNonce: "
+                            + 
resource.getSpec().getJob().getSavepointTriggerNonce(),

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to