This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new c531b370 [FLINK-32111] Add Null check for checkpoints history obj
c531b370 is described below
commit c531b3701a5e1e7ea51d37143979c27f1c88f78f
Author: Tamir Sagi <[email protected]>
AuthorDate: Fri May 19 18:43:15 2023 +0300
[FLINK-32111] Add Null check for checkpoints history obj
---
docs/content/docs/operations/plugins.md | 2 +-
.../operator/service/CheckpointHistoryWrapper.java | 2 +-
.../operator/service/NativeFlinkServiceTest.java | 23 ++++++++++++++++++++++
3 files changed, 25 insertions(+), 2 deletions(-)
diff --git a/docs/content/docs/operations/plugins.md
b/docs/content/docs/operations/plugins.md
index efbf34e7..b25f0559 100644
--- a/docs/content/docs/operations/plugins.md
+++ b/docs/content/docs/operations/plugins.md
@@ -107,7 +107,7 @@ Similar to custom validator implementations, resource
listeners are loaded via t
In order to enable your custom `FlinkResourceListener` you need to:
1. Implement the interface
- 2. Add your listener class to
`org.apache.flink.kubernetes.operator.listener.FlinkResourceListener` in
`META-INF/services`
+ 2. Add your listener class to
`org.apache.flink.kubernetes.operator.api.listener.FlinkResourceListener` in
`META-INF/services`
3. Package your JAR and add it to the plugins directory of your operator
image (`/opt/flink/plugins`)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java
index f53d0742..99a9abcd 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java
@@ -57,7 +57,7 @@ public class CheckpointHistoryWrapper implements ResponseBody
{
private ArrayNode history;
public Optional<PendingCheckpointInfo> getInProgressCheckpoint() {
- if (history.isEmpty()) {
+ if (history == null || history.isEmpty()) {
return Optional.empty();
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
index d7239018..805cf582 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
@@ -66,11 +66,13 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import static
org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION;
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT;
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -307,6 +309,27 @@ public class NativeFlinkServiceTest {
objectMapper.readValue(flink15Response,
CheckpointHistoryWrapper.class);
}
+ @Test
+ public void testGetInProgressCheckpointsFromResponseWithoutHistoryDetails()
+ throws JsonProcessingException {
+ ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
+ String response =
+
"{\"counts\":{\"restored\":0,\"total\":2,\"in_progress\":0,\"completed\":2,\"failed\":0}}";
+ var checkpointHistoryWrapper =
+ objectMapper.readValue(response,
CheckpointHistoryWrapper.class);
+ Optional<CheckpointHistoryWrapper.PendingCheckpointInfo>
optionalPendingCheckpointInfo =
+
assertDoesNotThrow(checkpointHistoryWrapper::getInProgressCheckpoint);
+ assertTrue(optionalPendingCheckpointInfo.isEmpty());
+ }
+
+ @Test
+ public void testGetInProgressCheckpointsWithoutHistory() {
+ CheckpointHistoryWrapper checkpointHistoryWrapper = new
CheckpointHistoryWrapper();
+ Optional<CheckpointHistoryWrapper.PendingCheckpointInfo>
optionalPendingCheckpointInfo =
+
assertDoesNotThrow(checkpointHistoryWrapper::getInProgressCheckpoint);
+ assertTrue(optionalPendingCheckpointInfo.isEmpty());
+ }
+
@Test
public void testClusterInfoRestCompatibility() throws
JsonProcessingException {
ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();