This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch release-1.0
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/release-1.0 by this push:
     new afed126  [hotfix] Fix last checkpoint observe with empty history
afed126 is described below

commit afed126618afed2347a0375c94001bce325a761b
Author: Gyula Fora <[email protected]>
AuthorDate: Mon May 23 05:38:41 2022 +0200

    [hotfix] Fix last checkpoint observe with empty history
---
 .../operator/service/CheckpointHistoryWrapper.java | 46 +++++++++++++++--
 .../kubernetes/operator/service/FlinkService.java  | 57 +++++-----------------
 .../kubernetes/operator/TestingClusterClient.java  | 10 ++--
 .../operator/service/FlinkServiceTest.java         | 57 +++++++++++++++++++++-
 4 files changed, 115 insertions(+), 55 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java
index 04c1179..fc9c7e5 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.kubernetes.operator.service;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@@ -27,7 +28,14 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.Obje
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
-import java.util.List;
+import java.util.Optional;
+
+import static 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics.FIELD_NAME_LATEST_CHECKPOINTS;
+import static 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_COMPLETED;
+import static 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_RESTORED;
+import static 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_SAVEPOINT;
+import static 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics.RestoredCheckpointStatistics.FIELD_NAME_EXTERNAL_PATH;
+import static 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics.RestoredCheckpointStatistics.FIELD_NAME_ID;
 
 /** Custom Response for handling checkpoint history in a multi-version 
compatible way. */
 @JsonIgnoreProperties(ignoreUnknown = true)
@@ -35,8 +43,38 @@ import java.util.List;
 @NoArgsConstructor
 public class CheckpointHistoryWrapper implements ResponseBody {
 
-    public static final String FIELD_NAME_HISTORY = "history";
+    @JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS)
+    private ObjectNode latestCheckpoints;
+
+    public Optional<String> getLatestCheckpointPath() {
+        if (latestCheckpoints == null) {
+            return Optional.empty();
+        }
+
+        var latestCheckpoint = 
getCheckpointInfo(FIELD_NAME_RESTORED).orElse(null);
+
+        var completed = getCheckpointInfo(FIELD_NAME_COMPLETED).orElse(null);
+        if (latestCheckpoint == null || (completed != null && completed.f0 > 
latestCheckpoint.f0)) {
+            latestCheckpoint = completed;
+        }
+        var savepoint = getCheckpointInfo(FIELD_NAME_SAVEPOINT).orElse(null);
+        if (latestCheckpoint == null || (savepoint != null && savepoint.f0 > 
latestCheckpoint.f0)) {
+            latestCheckpoint = savepoint;
+        }
+
+        return Optional.ofNullable(latestCheckpoint).map(t -> t.f1);
+    }
 
-    @JsonProperty(FIELD_NAME_HISTORY)
-    private List<ObjectNode> history;
+    private Optional<Tuple2<Long, String>> getCheckpointInfo(String field) {
+        return Optional.ofNullable(latestCheckpoints.get(field))
+                .filter(
+                        checkpoint ->
+                                checkpoint.has(FIELD_NAME_ID)
+                                        && 
checkpoint.has(FIELD_NAME_EXTERNAL_PATH))
+                .map(
+                        checkpoint ->
+                                Tuple2.of(
+                                        checkpoint.get(FIELD_NAME_ID).asLong(),
+                                        
checkpoint.get(FIELD_NAME_EXTERNAL_PATH).asText()));
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index d7c2756..304748e 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -49,7 +49,6 @@ import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import 
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
 import org.apache.flink.runtime.jobgraph.RestoreMode;
@@ -61,7 +60,6 @@ import 
org.apache.flink.runtime.rest.messages.DashboardConfiguration;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.TriggerId;
-import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
 import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
@@ -103,7 +101,6 @@ import java.net.URI;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -594,49 +591,19 @@ public class FlinkService {
                                     .getSeconds(),
                             TimeUnit.SECONDS);
 
-            var latestCheckpointOpt =
-                    checkpoints.getHistory().stream()
-                            .filter(
-                                    cp ->
-                                            CheckpointStatsStatus.valueOf(
-                                                            cp.get(
-                                                                            
CheckpointStatistics
-                                                                               
     .FIELD_NAME_STATUS)
-                                                                    .asText())
-                                                    == 
CheckpointStatsStatus.COMPLETED)
-                            .filter(
-                                    cp ->
-                                            !cp.get(
-                                                            
CheckpointStatistics
-                                                                    
.CompletedCheckpointStatistics
-                                                                    
.FIELD_NAME_EXTERNAL_PATH)
-                                                    .asText()
-                                                    .equals(
-                                                            
NonPersistentMetadataCheckpointStorageLocation
-                                                                    
.EXTERNAL_POINTER))
-                            .max(
-                                    Comparator.comparingLong(
-                                            cp ->
-                                                    
cp.get(CheckpointStatistics.FIELD_NAME_ID)
-                                                            .asLong()))
-                            .map(
-                                    cp ->
-                                            new Savepoint(
-                                                    cp.get(
-                                                                    
CheckpointStatistics
-                                                                            
.CompletedCheckpointStatistics
-                                                                            
.FIELD_NAME_TRIGGER_TIMESTAMP)
-                                                            .asLong(),
-                                                    cp.get(
-                                                                    
CheckpointStatistics
-                                                                            
.CompletedCheckpointStatistics
-                                                                            
.FIELD_NAME_EXTERNAL_PATH)
-                                                            .asText()));
-
-            if (!latestCheckpointOpt.isPresent()) {
-                LOG.warn("Could not find any externally addressable 
checkpoints.");
+            var latestCheckpointOpt = checkpoints.getLatestCheckpointPath();
+
+            if (latestCheckpointOpt.isPresent()
+                    && latestCheckpointOpt
+                            .get()
+                            .equals(
+                                    
NonPersistentMetadataCheckpointStorageLocation
+                                            .EXTERNAL_POINTER)) {
+                throw new DeploymentFailedException(
+                        "Latest checkpoint not externally addressable, manual 
recovery required.",
+                        "CheckpointNotFound");
             }
-            return latestCheckpointOpt;
+            return latestCheckpointOpt.map(Savepoint::of);
         }
     }
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
index b5942e8..730d99b 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
@@ -62,7 +62,7 @@ public class TestingClusterClient<T> extends 
RestClusterClient<T> {
                     MessageParameters,
                     RequestBody,
                     CompletableFuture<ResponseBody>>
-            triggerSavepointFunction =
+            requestProcessor =
                     (ignore1, ignore2, ignore) ->
                             
CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
 
@@ -94,14 +94,14 @@ public class TestingClusterClient<T> extends 
RestClusterClient<T> {
         this.stopWithSavepointFunction = stopWithSavepointFunction;
     }
 
-    public void setTriggerSavepointFunction(
+    public void setRequestProcessor(
             TriFunction<
                             MessageHeaders<?, ?, ?>,
                             MessageParameters,
                             RequestBody,
                             CompletableFuture<ResponseBody>>
-                    triggerSavepointFunction) {
-        this.triggerSavepointFunction = triggerSavepointFunction;
+                    requestProcessor) {
+        this.requestProcessor = requestProcessor;
     }
 
     public void setListJobsFunction(
@@ -202,6 +202,6 @@ public class TestingClusterClient<T> extends 
RestClusterClient<T> {
                     P extends ResponseBody>
             CompletableFuture<P> sendRequest(M messageHeaders, U 
messageParameters, R request) {
         return (CompletableFuture<P>)
-                triggerSavepointFunction.apply(messageHeaders, 
messageParameters, request);
+                requestProcessor.apply(messageHeaders, messageParameters, 
request);
     }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
index e0cb4ce..cd6d89e 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
@@ -50,6 +51,7 @@ import 
io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
 import java.util.concurrent.CompletableFuture;
 
 import static 
org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION;
@@ -58,6 +60,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 /** @link FlinkService unit tests */
 @EnableKubernetesMockClient(crud = true)
@@ -177,7 +180,7 @@ public class FlinkServiceTest {
                 new CompletableFuture<>();
         final String savepointPath = "file:///path/of/svp";
         configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointPath);
-        testingClusterClient.setTriggerSavepointFunction(
+        testingClusterClient.setRequestProcessor(
                 (headers, parameters, requestBody) -> {
                     triggerSavepointFuture.complete(
                             new Tuple3<>(
@@ -208,6 +211,58 @@ public class FlinkServiceTest {
         assertFalse(triggerSavepointFuture.get().f2);
     }
 
+    @Test
+    public void testGetLastCheckpoint() throws Exception {
+        ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
+        var testingClusterClient =
+                new TestingClusterClient<>(configuration, 
TestUtils.TEST_DEPLOYMENT_NAME);
+
+        String responseWithHistory =
+                
"{\"counts\":{\"restored\":1,\"total\":79,\"in_progress\":0,\"completed\":69,\"failed\":10},\"summary\":{\"checkpointed_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"state_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"end_to_end_duration\":{\"min\":14,\"max\":117,\"avg\":24,\"p50\":22,\"p90\":32,\"p95\":40.5,\"p99\":11
 [...]
+        String responseWithoutHistory =
+                
"{\"counts\":{\"restored\":1,\"total\":79,\"in_progress\":0,\"completed\":69,\"failed\":10},\"summary\":{\"checkpointed_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"state_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"end_to_end_duration\":{\"min\":14,\"max\":117,\"avg\":24,\"p50\":22,\"p90\":32,\"p95\":40.5,\"p99\":11
 [...]
+        String responseWithoutHistoryInternal =
+                
"{\"counts\":{\"restored\":1,\"total\":79,\"in_progress\":0,\"completed\":69,\"failed\":10},\"summary\":{\"checkpointed_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"state_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"end_to_end_duration\":{\"min\":14,\"max\":117,\"avg\":24,\"p50\":22,\"p90\":32,\"p95\":40.5,\"p99\":11
 [...]
+
+        var responseContainer = new ArrayList<CheckpointHistoryWrapper>();
+
+        testingClusterClient.setRequestProcessor(
+                (headers, parameters, requestBody) -> {
+                    if (headers instanceof 
CustomCheckpointingStatisticsHeaders) {
+                        return 
CompletableFuture.completedFuture(responseContainer.get(0));
+                    }
+                    fail("unknown request");
+                    return null;
+                });
+
+        var flinkService = createFlinkService(testingClusterClient);
+
+        responseContainer.add(
+                objectMapper.readValue(responseWithHistory, 
CheckpointHistoryWrapper.class));
+        var checkpointOpt = flinkService.getLastCheckpoint(new JobID(), new 
Configuration());
+        assertEquals(
+                
"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-96",
+                checkpointOpt.get().getLocation());
+
+        responseContainer.set(
+                0, objectMapper.readValue(responseWithoutHistory, 
CheckpointHistoryWrapper.class));
+        checkpointOpt = flinkService.getLastCheckpoint(new JobID(), new 
Configuration());
+        assertEquals(
+                "file:/flink-data/savepoints/savepoint-000000-5930e5326ca7",
+                checkpointOpt.get().getLocation());
+
+        responseContainer.set(
+                0,
+                objectMapper.readValue(
+                        responseWithoutHistoryInternal, 
CheckpointHistoryWrapper.class));
+        try {
+            flinkService.getLastCheckpoint(new JobID(), new Configuration());
+            fail();
+        } catch (DeploymentFailedException dpe) {
+
+        }
+    }
+
     @Test
     public void testGetLastSavepointRestCompatibility() throws 
JsonProcessingException {
         ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();

Reply via email to