This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch release-1.0
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/release-1.0 by this push:
new afed126 [hotfix] Fix last checkpoint observe with empty history
afed126 is described below
commit afed126618afed2347a0375c94001bce325a761b
Author: Gyula Fora <[email protected]>
AuthorDate: Mon May 23 05:38:41 2022 +0200
[hotfix] Fix last checkpoint observe with empty history
---
.../operator/service/CheckpointHistoryWrapper.java | 46 +++++++++++++++--
.../kubernetes/operator/service/FlinkService.java | 57 +++++-----------------
.../kubernetes/operator/TestingClusterClient.java | 10 ++--
.../operator/service/FlinkServiceTest.java | 57 +++++++++++++++++++++-
4 files changed, 115 insertions(+), 55 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java
index 04c1179..fc9c7e5 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java
@@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.operator.service;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@@ -27,7 +28,14 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.Obje
import lombok.Data;
import lombok.NoArgsConstructor;
-import java.util.List;
+import java.util.Optional;
+
+import static
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics.FIELD_NAME_LATEST_CHECKPOINTS;
+import static
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_COMPLETED;
+import static
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_RESTORED;
+import static
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_SAVEPOINT;
+import static
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics.RestoredCheckpointStatistics.FIELD_NAME_EXTERNAL_PATH;
+import static
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics.RestoredCheckpointStatistics.FIELD_NAME_ID;
/** Custom Response for handling checkpoint history in a multi-version
compatible way. */
@JsonIgnoreProperties(ignoreUnknown = true)
@@ -35,8 +43,38 @@ import java.util.List;
@NoArgsConstructor
public class CheckpointHistoryWrapper implements ResponseBody {
- public static final String FIELD_NAME_HISTORY = "history";
+ @JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS)
+ private ObjectNode latestCheckpoints;
+
+ public Optional<String> getLatestCheckpointPath() {
+ if (latestCheckpoints == null) {
+ return Optional.empty();
+ }
+
+ var latestCheckpoint =
getCheckpointInfo(FIELD_NAME_RESTORED).orElse(null);
+
+ var completed = getCheckpointInfo(FIELD_NAME_COMPLETED).orElse(null);
+ if (latestCheckpoint == null || (completed != null && completed.f0 >
latestCheckpoint.f0)) {
+ latestCheckpoint = completed;
+ }
+ var savepoint = getCheckpointInfo(FIELD_NAME_SAVEPOINT).orElse(null);
+ if (latestCheckpoint == null || (savepoint != null && savepoint.f0 >
latestCheckpoint.f0)) {
+ latestCheckpoint = savepoint;
+ }
+
+ return Optional.ofNullable(latestCheckpoint).map(t -> t.f1);
+ }
- @JsonProperty(FIELD_NAME_HISTORY)
- private List<ObjectNode> history;
+ private Optional<Tuple2<Long, String>> getCheckpointInfo(String field) {
+ return Optional.ofNullable(latestCheckpoints.get(field))
+ .filter(
+ checkpoint ->
+ checkpoint.has(FIELD_NAME_ID)
+ &&
checkpoint.has(FIELD_NAME_EXTERNAL_PATH))
+ .map(
+ checkpoint ->
+ Tuple2.of(
+ checkpoint.get(FIELD_NAME_ID).asLong(),
+
checkpoint.get(FIELD_NAME_EXTERNAL_PATH).asText()));
+ }
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index d7c2756..304748e 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -49,7 +49,6 @@ import
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
import org.apache.flink.runtime.client.JobStatusMessage;
import
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
import org.apache.flink.runtime.jobgraph.RestoreMode;
@@ -61,7 +60,6 @@ import
org.apache.flink.runtime.rest.messages.DashboardConfiguration;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.TriggerId;
-import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
import
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
import
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
@@ -103,7 +101,6 @@ import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -594,49 +591,19 @@ public class FlinkService {
.getSeconds(),
TimeUnit.SECONDS);
- var latestCheckpointOpt =
- checkpoints.getHistory().stream()
- .filter(
- cp ->
- CheckpointStatsStatus.valueOf(
- cp.get(
-
CheckpointStatistics
-
.FIELD_NAME_STATUS)
- .asText())
- ==
CheckpointStatsStatus.COMPLETED)
- .filter(
- cp ->
- !cp.get(
-
CheckpointStatistics
-
.CompletedCheckpointStatistics
-
.FIELD_NAME_EXTERNAL_PATH)
- .asText()
- .equals(
-
NonPersistentMetadataCheckpointStorageLocation
-
.EXTERNAL_POINTER))
- .max(
- Comparator.comparingLong(
- cp ->
-
cp.get(CheckpointStatistics.FIELD_NAME_ID)
- .asLong()))
- .map(
- cp ->
- new Savepoint(
- cp.get(
-
CheckpointStatistics
-
.CompletedCheckpointStatistics
-
.FIELD_NAME_TRIGGER_TIMESTAMP)
- .asLong(),
- cp.get(
-
CheckpointStatistics
-
.CompletedCheckpointStatistics
-
.FIELD_NAME_EXTERNAL_PATH)
- .asText()));
-
- if (!latestCheckpointOpt.isPresent()) {
- LOG.warn("Could not find any externally addressable
checkpoints.");
+ var latestCheckpointOpt = checkpoints.getLatestCheckpointPath();
+
+ if (latestCheckpointOpt.isPresent()
+ && latestCheckpointOpt
+ .get()
+ .equals(
+
NonPersistentMetadataCheckpointStorageLocation
+ .EXTERNAL_POINTER)) {
+ throw new DeploymentFailedException(
+ "Latest checkpoint not externally addressable, manual
recovery required.",
+ "CheckpointNotFound");
}
- return latestCheckpointOpt;
+ return latestCheckpointOpt.map(Savepoint::of);
}
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
index b5942e8..730d99b 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
@@ -62,7 +62,7 @@ public class TestingClusterClient<T> extends
RestClusterClient<T> {
MessageParameters,
RequestBody,
CompletableFuture<ResponseBody>>
- triggerSavepointFunction =
+ requestProcessor =
(ignore1, ignore2, ignore) ->
CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
@@ -94,14 +94,14 @@ public class TestingClusterClient<T> extends
RestClusterClient<T> {
this.stopWithSavepointFunction = stopWithSavepointFunction;
}
- public void setTriggerSavepointFunction(
+ public void setRequestProcessor(
TriFunction<
MessageHeaders<?, ?, ?>,
MessageParameters,
RequestBody,
CompletableFuture<ResponseBody>>
- triggerSavepointFunction) {
- this.triggerSavepointFunction = triggerSavepointFunction;
+ requestProcessor) {
+ this.requestProcessor = requestProcessor;
}
public void setListJobsFunction(
@@ -202,6 +202,6 @@ public class TestingClusterClient<T> extends
RestClusterClient<T> {
P extends ResponseBody>
CompletableFuture<P> sendRequest(M messageHeaders, U
messageParameters, R request) {
return (CompletableFuture<P>)
- triggerSavepointFunction.apply(messageHeaders,
messageParameters, request);
+ requestProcessor.apply(messageHeaders, messageParameters,
request);
}
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
index e0cb4ce..cd6d89e 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
@@ -50,6 +51,7 @@ import
io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import static
org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION;
@@ -58,6 +60,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
/** @link FlinkService unit tests */
@EnableKubernetesMockClient(crud = true)
@@ -177,7 +180,7 @@ public class FlinkServiceTest {
new CompletableFuture<>();
final String savepointPath = "file:///path/of/svp";
configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY,
savepointPath);
- testingClusterClient.setTriggerSavepointFunction(
+ testingClusterClient.setRequestProcessor(
(headers, parameters, requestBody) -> {
triggerSavepointFuture.complete(
new Tuple3<>(
@@ -208,6 +211,58 @@ public class FlinkServiceTest {
assertFalse(triggerSavepointFuture.get().f2);
}
+ @Test
+ public void testGetLastCheckpoint() throws Exception {
+ ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
+ var testingClusterClient =
+ new TestingClusterClient<>(configuration,
TestUtils.TEST_DEPLOYMENT_NAME);
+
+ String responseWithHistory =
+
"{\"counts\":{\"restored\":1,\"total\":79,\"in_progress\":0,\"completed\":69,\"failed\":10},\"summary\":{\"checkpointed_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"state_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"end_to_end_duration\":{\"min\":14,\"max\":117,\"avg\":24,\"p50\":22,\"p90\":32,\"p95\":40.5,\"p99\":11
[...]
+ String responseWithoutHistory =
+
"{\"counts\":{\"restored\":1,\"total\":79,\"in_progress\":0,\"completed\":69,\"failed\":10},\"summary\":{\"checkpointed_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"state_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"end_to_end_duration\":{\"min\":14,\"max\":117,\"avg\":24,\"p50\":22,\"p90\":32,\"p95\":40.5,\"p99\":11
[...]
+ String responseWithoutHistoryInternal =
+
"{\"counts\":{\"restored\":1,\"total\":79,\"in_progress\":0,\"completed\":69,\"failed\":10},\"summary\":{\"checkpointed_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"state_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"end_to_end_duration\":{\"min\":14,\"max\":117,\"avg\":24,\"p50\":22,\"p90\":32,\"p95\":40.5,\"p99\":11
[...]
+
+ var responseContainer = new ArrayList<CheckpointHistoryWrapper>();
+
+ testingClusterClient.setRequestProcessor(
+ (headers, parameters, requestBody) -> {
+ if (headers instanceof
CustomCheckpointingStatisticsHeaders) {
+ return
CompletableFuture.completedFuture(responseContainer.get(0));
+ }
+ fail("unknown request");
+ return null;
+ });
+
+ var flinkService = createFlinkService(testingClusterClient);
+
+ responseContainer.add(
+ objectMapper.readValue(responseWithHistory,
CheckpointHistoryWrapper.class));
+ var checkpointOpt = flinkService.getLastCheckpoint(new JobID(), new
Configuration());
+ assertEquals(
+
"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-96",
+ checkpointOpt.get().getLocation());
+
+ responseContainer.set(
+ 0, objectMapper.readValue(responseWithoutHistory,
CheckpointHistoryWrapper.class));
+ checkpointOpt = flinkService.getLastCheckpoint(new JobID(), new
Configuration());
+ assertEquals(
+ "file:/flink-data/savepoints/savepoint-000000-5930e5326ca7",
+ checkpointOpt.get().getLocation());
+
+ responseContainer.set(
+ 0,
+ objectMapper.readValue(
+ responseWithoutHistoryInternal,
CheckpointHistoryWrapper.class));
+ try {
+ flinkService.getLastCheckpoint(new JobID(), new Configuration());
+ fail();
+ } catch (DeploymentFailedException dpe) {
+
+ }
+ }
+
@Test
public void testGetLastSavepointRestCompatibility() throws
JsonProcessingException {
ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();