This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit b545bf35451b1cbf4337ef36beb30405547e52ea
Author: Gyula Fora <[email protected]>
AuthorDate: Thu Jul 13 10:23:38 2023 +0200

    [hotfix][tests] Improve Flink service tests
---
 .../operator/service/AbstractFlinkService.java     |  22 +-
 .../flink/kubernetes/operator/TestUtils.java       |   5 +-
 .../kubernetes/operator/TestingClusterClient.java  |  57 +-
 .../kubernetes/operator/TestingRestClient.java     |  73 ++
 .../operator/service/AbstractFlinkServiceTest.java | 873 +++++++++++++++++++++
 .../operator/service/NativeFlinkServiceTest.java   | 586 +++-----------
 .../operator/service/RestResponseTest.java         |  83 ++
 7 files changed, 1148 insertions(+), 551 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 843814f9..88cb42c5 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -79,10 +79,10 @@ import 
org.apache.flink.runtime.webmonitor.handlers.JarDeleteMessageParameters;
 import org.apache.flink.runtime.webmonitor.handlers.JarRunHeaders;
 import org.apache.flink.runtime.webmonitor.handlers.JarRunMessageParameters;
 import org.apache.flink.runtime.webmonitor.handlers.JarRunRequestBody;
-import org.apache.flink.runtime.webmonitor.handlers.JarRunResponseBody;
 import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders;
 import org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody;
 import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -220,7 +220,7 @@ public abstract class AbstractFlinkService implements 
FlinkService {
         try (var clusterClient = getClusterClient(config)) {
             uri = URI.create(clusterClient.getWebInterfaceURL());
         } catch (Exception ex) {
-            throw new RuntimeException(ex);
+            throw new FlinkRuntimeException(ex);
         }
         SocketAddress socketAddress = new InetSocketAddress(uri.getHost(), 
uri.getPort());
         Socket socket = new Socket();
@@ -660,7 +660,8 @@ public abstract class AbstractFlinkService implements 
FlinkService {
                 conf, clusterId, (c, e) -> new 
StandaloneClientHAServices(restServerAddress));
     }
 
-    private JarRunResponseBody runJar(
+    @VisibleForTesting
+    protected void runJar(
             JobSpec job,
             JobID jobID,
             JarUploadResponseBody response,
@@ -688,7 +689,7 @@ public abstract class AbstractFlinkService implements 
FlinkService {
                                     ? conf.toMap()
                                     : null);
             LOG.info("Submitting job: {} to session cluster.", 
jobID.toHexString());
-            return clusterClient
+            clusterClient
                     .sendRequest(headers, parameters, runRequestBody)
                     .get(operatorConfig.getFlinkClientTimeout().toSeconds(), 
TimeUnit.SECONDS);
         } catch (Exception e) {
@@ -716,8 +717,7 @@ public abstract class AbstractFlinkService implements 
FlinkService {
                         operatorConfig.getFlinkServiceHostOverride(),
                         
ExternalServiceDecorator.getNamespacedExternalServiceName(
                                 clusterId, namespace));
-        try (RestClient restClient = new RestClient(conf, executorService)) {
-            // TODO add method in flink#RestClusterClient to support upload 
jar.
+        try (var restClient = getRestClient(conf)) {
             return restClient
                     .sendRequest(
                             host,
@@ -735,6 +735,11 @@ public abstract class AbstractFlinkService implements 
FlinkService {
         }
     }
 
+    @VisibleForTesting
+    protected RestClient getRestClient(Configuration conf) throws 
ConfigurationException {
+        return new RestClient(conf, executorService);
+    }
+
     private String findJarURI(JobSpec jobSpec) {
         if (jobSpec.getJarURI() != null) {
             return jobSpec.getJarURI();
@@ -743,8 +748,7 @@ public abstract class AbstractFlinkService implements 
FlinkService {
         }
     }
 
-    @VisibleForTesting
-    protected void deleteJar(Configuration conf, String jarId) {
+    private void deleteJar(Configuration conf, String jarId) {
         LOG.debug("Deleting the jar: {}", jarId);
         try (var clusterClient = getClusterClient(conf)) {
             JarDeleteHeaders headers = JarDeleteHeaders.getInstance();
@@ -910,7 +914,7 @@ public abstract class AbstractFlinkService implements 
FlinkService {
         }
     }
 
-    public TaskManagersInfo getTaskManagersInfo(Configuration conf) throws 
Exception {
+    private TaskManagersInfo getTaskManagersInfo(Configuration conf) throws 
Exception {
         try (var clusterClient = getClusterClient(conf)) {
             return clusterClient
                     .sendRequest(
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 79e8f71c..a3b1b101 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -292,7 +292,10 @@ public class TestUtils extends BaseTestUtils {
     }
 
     public static Stream<Arguments> flinkVersions() {
-        return Stream.of(arguments(FlinkVersion.v1_14), 
arguments(FlinkVersion.v1_15));
+        return Stream.of(
+                arguments(FlinkVersion.v1_14),
+                arguments(FlinkVersion.v1_15),
+                arguments(FlinkVersion.v1_17));
     }
 
     public static FlinkDeployment createCanaryDeployment() {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
index 413a6943..05d7e5cb 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
@@ -39,6 +39,9 @@ import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.util.function.TriFunction;
 
+import lombok.Getter;
+import lombok.Setter;
+
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
@@ -48,17 +51,24 @@ import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
-/** Testing ClusterClient used implementation. */
+/** Testing ClusterClient implementation. */
 public class TestingClusterClient<T> extends RestClusterClient<T> {
 
+    @Setter
     private Function<JobID, CompletableFuture<Acknowledge>> cancelFunction =
             ignore -> CompletableFuture.completedFuture(Acknowledge.get());
+
+    @Setter
     private TriFunction<JobID, Boolean, String, CompletableFuture<String>>
             stopWithSavepointFunction =
                     (ignore1, ignore2, savepointPath) ->
                             CompletableFuture.completedFuture(savepointPath);
+
+    @Setter
     private TriFunction<JobID, SavepointFormatType, String, 
CompletableFuture<String>>
             stopWithSavepointFormat;
+
+    @Setter
     private TriFunction<
                     MessageHeaders<?, ?, ?>,
                     MessageParameters,
@@ -68,17 +78,19 @@ public class TestingClusterClient<T> extends 
RestClusterClient<T> {
                     (ignore1, ignore2, ignore) ->
                             
CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
 
+    @Setter
     private Supplier<CompletableFuture<Collection<JobStatusMessage>>> 
listJobsFunction =
             () -> {
                 throw new UnsupportedOperationException();
             };
 
+    @Setter
     private Function<JobID, CompletableFuture<JobResult>> 
requestResultFunction =
             jobID ->
                     CompletableFuture.completedFuture(
                             new 
JobResult.Builder().jobId(jobID).netRuntime(1).build());
 
-    private final T clusterId;
+    @Getter private final T clusterId;
 
     public TestingClusterClient(Configuration configuration, T clusterId) 
throws Exception {
         super(configuration, clusterId, (c, e) -> new 
StandaloneClientHAServices("localhost"));
@@ -89,47 +101,6 @@ public class TestingClusterClient<T> extends 
RestClusterClient<T> {
         this(configuration, (T) 
configuration.get(KubernetesConfigOptions.CLUSTER_ID));
     }
 
-    public void setCancelFunction(Function<JobID, 
CompletableFuture<Acknowledge>> cancelFunction) {
-        this.cancelFunction = cancelFunction;
-    }
-
-    public void setStopWithSavepointFunction(
-            TriFunction<JobID, Boolean, String, CompletableFuture<String>>
-                    stopWithSavepointFunction) {
-        this.stopWithSavepointFunction = stopWithSavepointFunction;
-    }
-
-    public void setStopWithSavepointFormat(
-            TriFunction<JobID, SavepointFormatType, String, 
CompletableFuture<String>>
-                    stopWithSavepointFormat) {
-        this.stopWithSavepointFormat = stopWithSavepointFormat;
-    }
-
-    public void setRequestProcessor(
-            TriFunction<
-                            MessageHeaders<?, ?, ?>,
-                            MessageParameters,
-                            RequestBody,
-                            CompletableFuture<ResponseBody>>
-                    requestProcessor) {
-        this.requestProcessor = requestProcessor;
-    }
-
-    public void setListJobsFunction(
-            Supplier<CompletableFuture<Collection<JobStatusMessage>>> 
listJobsFunction) {
-        this.listJobsFunction = listJobsFunction;
-    }
-
-    public void setRequestResultFunction(
-            Function<JobID, CompletableFuture<JobResult>> 
requestResultFunction) {
-        this.requestResultFunction = requestResultFunction;
-    }
-
-    @Override
-    public T getClusterId() {
-        return clusterId;
-    }
-
     @Override
     public Configuration getFlinkConfiguration() {
         throw new UnsupportedOperationException();
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingRestClient.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingRestClient.java
new file mode 100644
index 00000000..49558859
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingRestClient.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.FileUpload;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.concurrent.Executors;
+import org.apache.flink.util.function.TriFunction;
+
+import lombok.Setter;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/** Testing RestClient implementation. */
+public class TestingRestClient extends RestClient {
+
+    @Setter
+    private TriFunction<
+                    MessageHeaders<?, ?, ?>,
+                    MessageParameters,
+                    RequestBody,
+                    CompletableFuture<ResponseBody>>
+            requestProcessor =
+                    (ignore1, ignore2, ignore) ->
+                            
CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+
+    public TestingRestClient(Configuration conf) throws ConfigurationException 
{
+        super(conf, Executors.directExecutor());
+    }
+
+    @Override
+    public <
+                    M extends MessageHeaders<R, P, U>,
+                    U extends MessageParameters,
+                    R extends RequestBody,
+                    P extends ResponseBody>
+            CompletableFuture<P> sendRequest(
+                    String targetAddress,
+                    int targetPort,
+                    M messageHeaders,
+                    U messageParameters,
+                    R request,
+                    Collection<FileUpload> fileUploads,
+                    RestAPIVersion<? extends RestAPIVersion<?>> apiVersion) {
+        return (CompletableFuture<P>)
+                requestProcessor.apply(messageHeaders, messageParameters, 
request);
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
new file mode 100644
index 00000000..c469679d
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
@@ -0,0 +1,873 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.service;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.TestingClusterClient;
+import org.apache.flink.kubernetes.operator.TestingRestClient;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
+import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
+import 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.api.status.JobStatus;
+import org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType;
+import org.apache.flink.kubernetes.operator.artifact.ArtifactManager;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
+import org.apache.flink.kubernetes.operator.exception.RecoveryFailureException;
+import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
+import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
+import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.TriggerId;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
+import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
+import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHeaders;
+import org.apache.flink.runtime.webmonitor.handlers.JarRunRequestBody;
+import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders;
+import org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.concurrent.Executors;
+import org.apache.flink.util.function.TriFunction;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.fabric8.kubernetes.api.model.DeletionPropagation;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION;
+import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/** @link FlinkService unit tests */
+@EnableKubernetesMockClient(crud = true)
+public class AbstractFlinkServiceTest {
+
+    @TempDir Path tempDir;
+    File testJar;
+
+    private KubernetesClient client;
+    private final Configuration configuration = new Configuration();
+
+    private final FlinkConfigManager configManager = new 
FlinkConfigManager(configuration);
+    private FlinkOperatorConfiguration operatorConfig;
+    private ExecutorService executorService;
+
+    private ArtifactManager artifactManager;
+
+    @BeforeEach
+    public void setup() {
+        configuration.set(KubernetesConfigOptions.CLUSTER_ID, 
TestUtils.TEST_DEPLOYMENT_NAME);
+        configuration.set(KubernetesConfigOptions.NAMESPACE, 
TestUtils.TEST_NAMESPACE);
+        configuration.set(FLINK_VERSION, FlinkVersion.v1_18);
+        operatorConfig = 
FlinkOperatorConfiguration.fromConfiguration(configuration);
+        executorService = Executors.newDirectExecutorService();
+        testJar = tempDir.resolve("test.jar").toFile();
+        artifactManager =
+                new ArtifactManager(configManager) {
+                    @Override
+                    public File fetch(
+                            String jarURI, Configuration flinkConfiguration, 
String targetDirStr)
+                            throws IOException {
+                        Files.writeString(testJar.toPath(), "test");
+                        return testJar;
+                    }
+                };
+    }
+
+    @ParameterizedTest
+    
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
+    public void sessionJobSubmissionTest(FlinkVersion flinkVersion) throws 
Exception {
+        var jarRuns = new ArrayList<JarRunRequestBody>();
+        var flinkService =
+                getTestingService(
+                        (h, p, b) -> {
+                            if (b instanceof JarRunRequestBody) {
+                                jarRuns.add((JarRunRequestBody) b);
+                                return CompletableFuture.completedFuture(null);
+                            } else if (h instanceof JarUploadHeaders) {
+                                return CompletableFuture.completedFuture(
+                                        new JarUploadResponseBody("test"));
+                            } else if (h instanceof JarDeleteHeaders) {
+                                return CompletableFuture.completedFuture(null);
+                            }
+
+                            throw new UnsupportedOperationException("Unknown 
request");
+                        });
+        var session = TestUtils.buildSessionCluster(flinkVersion);
+        session.getStatus()
+                .getReconciliationStatus()
+                .serializeAndSetLastReconciledSpec(session.getSpec(), session);
+
+        var job = TestUtils.buildSessionJob();
+        var deployConf = configManager.getSessionJobConfig(session, 
job.getSpec());
+        flinkService.submitJobToSessionCluster(job.getMetadata(), 
job.getSpec(), deployConf, null);
+
+        // Make sure that deploy conf was passed to jar run
+        if (flinkVersion.isNewerVersionThan(FlinkVersion.v1_16)) {
+            assertEquals(deployConf.toMap(), 
jarRuns.get(0).getFlinkConfiguration().toMap());
+        } else {
+            
assertTrue(jarRuns.get(0).getFlinkConfiguration().toMap().isEmpty());
+        }
+    }
+
+    @Test
+    public void jarRunErrorHandlingTest() throws Exception {
+        List<JarRunRequestBody> jarRuns = new ArrayList<>();
+        AtomicBoolean deleted = new AtomicBoolean(false);
+        var flinkService =
+                getTestingService(
+                        (h, p, b) -> {
+                            if (b instanceof JarRunRequestBody) {
+                                jarRuns.add((JarRunRequestBody) b);
+                                return CompletableFuture.failedFuture(
+                                        new Exception("RunException"));
+                            } else if (h instanceof JarDeleteHeaders) {
+                                deleted.set(true);
+                                return CompletableFuture.failedFuture(
+                                        new Exception("DeleteException"));
+                            }
+
+                            fail();
+                            return null;
+                        });
+
+        var job = TestUtils.buildSessionJob();
+        var jobId = new JobID();
+
+        assertThrows(
+                FlinkRuntimeException.class,
+                () ->
+                        flinkService.runJar(
+                                job.getSpec().getJob(),
+                                jobId,
+                                new JarUploadResponseBody("test"),
+                                configuration,
+                                null));
+        assertEquals(jobId, jarRuns.get(0).getJobId());
+        assertTrue(deleted.get());
+    }
+
+    private TestingService getTestingService(
+            TriFunction<
+                            MessageHeaders<?, ?, ?>,
+                            MessageParameters,
+                            RequestBody,
+                            CompletableFuture<ResponseBody>>
+                    requestProcessor)
+            throws Exception {
+        var testingClusterClient = new 
TestingClusterClient<String>(configuration);
+        testingClusterClient.setRequestProcessor(requestProcessor);
+        var testingRestClient = new TestingRestClient(configuration);
+        testingRestClient.setRequestProcessor(requestProcessor);
+        return new TestingService(testingClusterClient, testingRestClient);
+    }
+
+    @Test
+    public void cancelJobWithStatelessUpgradeModeTest() throws Exception {
+        final TestingClusterClient<String> testingClusterClient =
+                new TestingClusterClient<>(configuration, 
TestUtils.TEST_DEPLOYMENT_NAME);
+        final CompletableFuture<JobID> cancelFuture = new 
CompletableFuture<>();
+        testingClusterClient.setCancelFunction(
+                jobID -> {
+                    cancelFuture.complete(jobID);
+                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                });
+
+        var flinkService = new TestingService(testingClusterClient);
+
+        JobID jobID = JobID.generate();
+        FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        JobStatus jobStatus = deployment.getStatus().getJobStatus();
+        jobStatus.setJobId(jobID.toHexString());
+        ReconciliationUtils.updateStatusForDeployedSpec(deployment, new 
Configuration());
+
+        
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+        deployment.getStatus().getJobStatus().setState("RUNNING");
+        flinkService.cancelJob(
+                deployment,
+                UpgradeMode.STATELESS,
+                configManager.getObserveConfig(deployment),
+                false);
+        assertTrue(cancelFuture.isDone());
+        assertEquals(jobID, cancelFuture.get());
+        assertNull(jobStatus.getSavepointInfo().getLastSavepoint());
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void cancelJobWithSavepointUpgradeModeTest(boolean 
deleteAfterSavepoint)
+            throws Exception {
+        var testingClusterClient =
+                new TestingClusterClient<>(configuration, 
TestUtils.TEST_DEPLOYMENT_NAME);
+        CompletableFuture<Tuple3<JobID, Boolean, String>> 
stopWithSavepointFuture =
+                new CompletableFuture<>();
+        var savepointPath = "file:///path/of/svp-1";
+        configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointPath);
+        testingClusterClient.setStopWithSavepointFunction(
+                (jobID, advanceToEndOfEventTime, savepointDir) -> {
+                    stopWithSavepointFuture.complete(
+                            new Tuple3<>(jobID, advanceToEndOfEventTime, 
savepointDir));
+                    return CompletableFuture.completedFuture(savepointPath);
+                });
+
+        var flinkService = new TestingService(testingClusterClient);
+
+        JobID jobID = JobID.generate();
+        FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        deployment
+                .getSpec()
+                .getFlinkConfiguration()
+                .put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), 
savepointPath);
+        
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+        JobStatus jobStatus = deployment.getStatus().getJobStatus();
+        jobStatus.setJobId(jobID.toHexString());
+        
jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
+        ReconciliationUtils.updateStatusForDeployedSpec(deployment, new 
Configuration());
+
+        flinkService.cancelJob(
+                deployment,
+                UpgradeMode.SAVEPOINT,
+                configManager.getObserveConfig(deployment),
+                deleteAfterSavepoint);
+        assertTrue(stopWithSavepointFuture.isDone());
+        assertEquals(jobID, stopWithSavepointFuture.get().f0);
+        assertFalse(stopWithSavepointFuture.get().f1);
+        assertEquals(savepointPath, stopWithSavepointFuture.get().f2);
+        assertEquals(savepointPath, 
jobStatus.getSavepointInfo().getLastSavepoint().getLocation());
+
+        assertEquals(jobStatus.getState(), 
org.apache.flink.api.common.JobStatus.FINISHED.name());
+        assertEquals(
+                deployment.getStatus().getJobManagerDeploymentStatus(),
+                deleteAfterSavepoint
+                        ? JobManagerDeploymentStatus.MISSING
+                        : JobManagerDeploymentStatus.READY);
+        if (deleteAfterSavepoint) {
+            assertEquals(List.of(deployment.getMetadata()), 
flinkService.deleted);
+        } else {
+            assertTrue(flinkService.deleted.isEmpty());
+        }
+    }
+
+    @Test
+    public void cancelJobWithLastStateUpgradeModeTest() throws Exception {
+        var deployment = TestUtils.buildApplicationCluster();
+        ReconciliationUtils.updateStatusForDeployedSpec(deployment, new 
Configuration());
+        var testingClusterClient =
+                new TestingClusterClient<>(configuration, 
TestUtils.TEST_DEPLOYMENT_NAME);
+        var flinkService = new TestingService(testingClusterClient);
+
+        JobID jobID = JobID.generate();
+        JobStatus jobStatus = deployment.getStatus().getJobStatus();
+        jobStatus.setJobId(jobID.toHexString());
+
+        flinkService.cancelJob(
+                deployment,
+                UpgradeMode.LAST_STATE,
+                configManager.getObserveConfig(deployment),
+                false);
+        assertNull(jobStatus.getSavepointInfo().getLastSavepoint());
+    }
+
+    @Test
+    public void deletionPropagationTest() {
+        var propagation = new ArrayList<DeletionPropagation>();
+        TestingService flinkService =
+                new TestingService(null) {
+                    @Override
+                    protected void deleteClusterInternal(
+                            ObjectMeta meta,
+                            Configuration conf,
+                            boolean deleteHaData,
+                            DeletionPropagation deletionPropagation) {
+                        propagation.add(deletionPropagation);
+                    }
+                };
+
+        flinkService.deleteClusterDeployment(
+                new ObjectMeta(), new FlinkDeploymentStatus(), configuration, 
true);
+        assertEquals(DeletionPropagation.FOREGROUND, propagation.get(0));
+
+        configuration.set(
+                KubernetesOperatorConfigOptions.RESOURCE_DELETION_PROPAGATION,
+                DeletionPropagation.BACKGROUND);
+        operatorConfig = 
FlinkOperatorConfiguration.fromConfiguration(configuration);
+
+        flinkService =
+                new TestingService(null) {
+                    @Override
+                    protected void deleteClusterInternal(
+                            ObjectMeta meta,
+                            Configuration conf,
+                            boolean deleteHaData,
+                            DeletionPropagation deletionPropagation) {
+                        propagation.add(deletionPropagation);
+                    }
+                };
+        flinkService.deleteClusterDeployment(
+                new ObjectMeta(), new FlinkDeploymentStatus(), configuration, 
true);
+        assertEquals(DeletionPropagation.BACKGROUND, propagation.get(1));
+    }
+
+    @Test
+    public void triggerSavepointTest() throws Exception {
+        CompletableFuture<Tuple3<JobID, String, Boolean>> 
triggerSavepointFuture =
+                new CompletableFuture<>();
+        String savepointPath = "file:///path/of/svp";
+        configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointPath);
+        var flinkService =
+                getTestingService(
+                        (headers, parameters, requestBody) -> {
+                            triggerSavepointFuture.complete(
+                                    new Tuple3<>(
+                                            
((SavepointTriggerMessageParameters) parameters)
+                                                    .jobID.getValue(),
+                                            ((SavepointTriggerRequestBody) 
requestBody)
+                                                    .getTargetDirectory()
+                                                    .get(),
+                                            ((SavepointTriggerRequestBody) 
requestBody)
+                                                    .isCancelJob()));
+                            return CompletableFuture.completedFuture(
+                                    new TriggerResponse(new TriggerId()));
+                        });
+
+        var jobID = JobID.generate();
+        var flinkDeployment = TestUtils.buildApplicationCluster();
+        ReconciliationUtils.updateStatusForDeployedSpec(flinkDeployment, new 
Configuration());
+        JobStatus jobStatus = new JobStatus();
+        jobStatus.setJobId(jobID.toString());
+        flinkDeployment.getStatus().setJobStatus(jobStatus);
+        flinkService.triggerSavepoint(
+                flinkDeployment.getStatus().getJobStatus().getJobId(),
+                SavepointTriggerType.MANUAL,
+                flinkDeployment.getStatus().getJobStatus().getSavepointInfo(),
+                configuration);
+        assertTrue(triggerSavepointFuture.isDone());
+        assertEquals(jobID, triggerSavepointFuture.get().f0);
+        assertEquals(savepointPath, triggerSavepointFuture.get().f1);
+        assertFalse(triggerSavepointFuture.get().f2);
+    }
+
+    @Test
+    public void disposeSavepointTest() throws Exception {
+        var savepointPath = "file:///path/of/svp";
+        var tested = new AtomicBoolean(false);
+        configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointPath);
+        var flinkService =
+                getTestingService(
+                        (h, p, r) -> {
+                            if (r instanceof SavepointDisposalRequest) {
+                                var dr = (SavepointDisposalRequest) r;
+                                assertEquals(savepointPath, 
dr.getSavepointPath());
+                                tested.set(true);
+                                return CompletableFuture.completedFuture(null);
+                            }
+                            fail("unknown request");
+                            return null;
+                        });
+        flinkService.disposeSavepoint(savepointPath, configuration);
+        assertTrue(tested.get());
+    }
+
+    @Test
+    public void nativeSavepointFormatTest() throws Exception {
+        final TestingClusterClient<String> testingClusterClient =
+                new TestingClusterClient<>(configuration, 
TestUtils.TEST_DEPLOYMENT_NAME);
+        final String savepointPath = "file:///path/of/svp";
+        final CompletableFuture<Tuple4<JobID, String, Boolean, 
SavepointFormatType>>
+                triggerSavepointFuture = new CompletableFuture<>();
+        configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointPath);
+        testingClusterClient.setRequestProcessor(
+                (headers, parameters, requestBody) -> {
+                    triggerSavepointFuture.complete(
+                            new Tuple4<>(
+                                    ((SavepointTriggerMessageParameters) 
parameters)
+                                            .jobID.getValue(),
+                                    ((SavepointTriggerRequestBody) requestBody)
+                                            .getTargetDirectory()
+                                            .get(),
+                                    ((SavepointTriggerRequestBody) 
requestBody).isCancelJob(),
+                                    ((SavepointTriggerRequestBody) 
requestBody).getFormatType()));
+                    return CompletableFuture.completedFuture(new 
TriggerResponse(new TriggerId()));
+                });
+        final CompletableFuture<Tuple3<JobID, SavepointFormatType, String>>
+                stopWithSavepointFuture = new CompletableFuture<>();
+        testingClusterClient.setStopWithSavepointFormat(
+                (id, formatType, savepointDir) -> {
+                    stopWithSavepointFuture.complete(new Tuple3<>(id, 
formatType, savepointDir));
+                    return CompletableFuture.completedFuture(savepointPath);
+                });
+
+        var flinkService = new TestingService(testingClusterClient);
+
+        final JobID jobID = JobID.generate();
+        final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        deployment
+                .getSpec()
+                .getFlinkConfiguration()
+                .put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), 
savepointPath);
+        
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+        JobStatus jobStatus = deployment.getStatus().getJobStatus();
+        jobStatus.setJobId(jobID.toHexString());
+        
jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
+        ReconciliationUtils.updateStatusForDeployedSpec(deployment, new 
Configuration());
+
+        jobStatus.setJobId(jobID.toString());
+        deployment.getStatus().setJobStatus(jobStatus);
+        flinkService.triggerSavepoint(
+                deployment.getStatus().getJobStatus().getJobId(),
+                SavepointTriggerType.MANUAL,
+                deployment.getStatus().getJobStatus().getSavepointInfo(),
+                new Configuration(configuration)
+                        .set(OPERATOR_SAVEPOINT_FORMAT_TYPE, 
SavepointFormatType.NATIVE));
+        assertTrue(triggerSavepointFuture.isDone());
+        assertEquals(jobID, triggerSavepointFuture.get().f0);
+        assertEquals(savepointPath, triggerSavepointFuture.get().f1);
+        assertFalse(triggerSavepointFuture.get().f2);
+        assertEquals(SavepointFormatType.NATIVE, 
triggerSavepointFuture.get().f3);
+
+        flinkService.cancelJob(
+                deployment,
+                UpgradeMode.SAVEPOINT,
+                new Configuration(configManager.getObserveConfig(deployment))
+                        .set(OPERATOR_SAVEPOINT_FORMAT_TYPE, 
SavepointFormatType.NATIVE),
+                false);
+        assertTrue(stopWithSavepointFuture.isDone());
+        assertEquals(jobID, stopWithSavepointFuture.get().f0);
+        assertEquals(SavepointFormatType.NATIVE, 
stopWithSavepointFuture.get().f1);
+        assertEquals(savepointPath, stopWithSavepointFuture.get().f2);
+    }
+
+    @Test
+    public void getLastCheckpointTest() throws Exception {
+        ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
+
+        var responseContainer = new ArrayList<CheckpointHistoryWrapper>();
+        var flinkService =
+                getTestingService(
+                        (headers, parameters, requestBody) -> {
+                            if (headers instanceof 
CustomCheckpointingStatisticsHeaders) {
+                                return 
CompletableFuture.completedFuture(responseContainer.get(0));
+                            }
+                            fail("unknown request");
+                            return null;
+                        });
+
+        String responseWithHistory =
+                
"{\"counts\":{\"restored\":1,\"total\":79,\"in_progress\":0,\"completed\":69,\"failed\":10},\"summary\":{\"checkpointed_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"state_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"end_to_end_duration\":{\"min\":14,\"max\":117,\"avg\":24,\"p50\":22,\"p90\":32,\"p95\":40.5,\"p99\":11
 [...]
+        String responseWithoutHistory =
+                
"{\"counts\":{\"restored\":1,\"total\":79,\"in_progress\":0,\"completed\":69,\"failed\":10},\"summary\":{\"checkpointed_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"state_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"end_to_end_duration\":{\"min\":14,\"max\":117,\"avg\":24,\"p50\":22,\"p90\":32,\"p95\":40.5,\"p99\":11
 [...]
+        String responseWithoutHistoryInternal =
+                
"{\"counts\":{\"restored\":1,\"total\":79,\"in_progress\":0,\"completed\":69,\"failed\":10},\"summary\":{\"checkpointed_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"state_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"end_to_end_duration\":{\"min\":14,\"max\":117,\"avg\":24,\"p50\":22,\"p90\":32,\"p95\":40.5,\"p99\":11
 [...]
+
+        responseContainer.add(
+                objectMapper.readValue(responseWithHistory, 
CheckpointHistoryWrapper.class));
+        var checkpointOpt = flinkService.getLastCheckpoint(new JobID(), new 
Configuration());
+        assertEquals(
+                
"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-96",
+                checkpointOpt.get().getLocation());
+
+        responseContainer.set(
+                0, objectMapper.readValue(responseWithoutHistory, 
CheckpointHistoryWrapper.class));
+        checkpointOpt = flinkService.getLastCheckpoint(new JobID(), new 
Configuration());
+        assertEquals(
+                "file:/flink-data/savepoints/savepoint-000000-5930e5326ca7",
+                checkpointOpt.get().getLocation());
+
+        responseContainer.set(
+                0,
+                objectMapper.readValue(
+                        responseWithoutHistoryInternal, 
CheckpointHistoryWrapper.class));
+        try {
+            flinkService.getLastCheckpoint(new JobID(), new Configuration());
+            fail();
+        } catch (RecoveryFailureException dpe) {
+
+        }
+    }
+
+    @Test
+    public void fetchSavepointInfoTest() throws Exception {
+        var triggerId = new TriggerId();
+        var jobId = new JobID();
+        var response = new 
AtomicReference<AsynchronousOperationResult<SavepointInfo>>();
+        var flinkService =
+                getTestingService(
+                        (h, p, r) -> {
+                            if (p instanceof SavepointStatusMessageParameters) 
{
+                                var params = 
(SavepointStatusMessageParameters) p;
+                                assertEquals(jobId, 
params.jobIdPathParameter.getValue());
+                                assertEquals(triggerId, 
params.triggerIdPathParameter.getValue());
+                                if (response.get() == null) {
+                                    return CompletableFuture.failedFuture(new 
Exception("fail"));
+                                }
+                                return 
CompletableFuture.completedFuture(response.get());
+                            }
+                            fail("unknown request");
+                            return null;
+                        });
+
+        response.set(AsynchronousOperationResult.completed(new 
SavepointInfo("l", null)));
+        assertEquals(
+                SavepointFetchResult.completed("l"),
+                flinkService.fetchSavepointInfo(
+                        triggerId.toString(), jobId.toString(), 
configuration));
+
+        response.set(AsynchronousOperationResult.inProgress());
+        assertEquals(
+                SavepointFetchResult.pending(),
+                flinkService.fetchSavepointInfo(
+                        triggerId.toString(), jobId.toString(), 
configuration));
+
+        response.set(
+                AsynchronousOperationResult.completed(
+                        new SavepointInfo(
+                                null, new SerializedThrowable(new 
Exception("testErr")))));
+        assertTrue(
+                flinkService
+                        .fetchSavepointInfo(triggerId.toString(), 
jobId.toString(), configuration)
+                        .getError()
+                        .contains("testErr"));
+
+        response.set(null);
+        assertTrue(
+                flinkService
+                        .fetchSavepointInfo(triggerId.toString(), 
jobId.toString(), configuration)
+                        .getError()
+                        .contains("fail"));
+    }
+
+    @Test
+    public void removeOperatorConfigTest() {
+        var key = "kubernetes.operator.meyKey";
+        var deployConfig = 
Configuration.fromMap(Map.of("kubernetes.operator.meyKey", "v"));
+        var newConf = AbstractFlinkService.removeOperatorConfigs(deployConfig);
+        assertFalse(newConf.containsKey(key));
+    }
+
+    @Test
+    public void getMetricsTest() throws Exception {
+        var jobId = new JobID();
+        var metricNames = List.of("m1", "m2");
+        var flinkService =
+                getTestingService(
+                        (h, p, r) -> {
+                            if (p instanceof JobMetricsMessageParameters) {
+                                var jmmp = ((JobMetricsMessageParameters) p);
+                                assertEquals(jobId, 
jmmp.jobPathParameter.getValue());
+                                var output =
+                                        
jmmp.metricsFilterParameter.getValue().stream()
+                                                .map(s -> new Metric(s, s))
+                                                .collect(Collectors.toList());
+                                return CompletableFuture.completedFuture(
+                                        new 
MetricCollectionResponseBody(output));
+                            }
+                            fail("unknown request");
+                            return null;
+                        });
+        assertEquals(
+                Map.of("m1", "m1", "m2", "m2"),
+                flinkService.getMetrics(configuration, jobId.toHexString(), 
metricNames));
+    }
+
+    @Test
+    public void getClusterInfoTest() throws Exception {
+        var config = new CustomDashboardConfiguration();
+        var testVersion = "testVersion";
+        var testRevision = "testRevision";
+        config.setFlinkVersion(testVersion);
+        config.setFlinkRevision(testRevision);
+
+        var tmInfo =
+                new TaskManagerInfo(
+                        ResourceID.generate(),
+                        "",
+                        0,
+                        0,
+                        0L,
+                        0,
+                        0,
+                        ResourceProfile.UNKNOWN,
+                        ResourceProfile.UNKNOWN,
+                        new HardwareDescription(1, 0L, 0L, 0L),
+                        new TaskExecutorMemoryConfiguration(0L, 0L, 0L, 0L, 
0L, 0L, 0L, 0L, 0L, 0L),
+                        null);
+        var tmsInfo = new TaskManagersInfo(List.of(tmInfo));
+
+        var flinkService =
+                getTestingService(
+                        (h, p, r) -> {
+                            if (h instanceof 
CustomDashboardConfigurationHeaders) {
+                                return 
CompletableFuture.completedFuture(config);
+                            } else if (h instanceof TaskManagersHeaders) {
+                                return 
CompletableFuture.completedFuture(tmsInfo);
+                            }
+                            fail("unknown request");
+                            return null;
+                        });
+
+        var conf = new Configuration();
+        conf.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(1000));
+        conf.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(1000));
+
+        assertEquals(
+                Map.of(
+                        DashboardConfiguration.FIELD_NAME_FLINK_VERSION,
+                        testVersion,
+                        DashboardConfiguration.FIELD_NAME_FLINK_REVISION,
+                        testRevision,
+                        AbstractFlinkService.FIELD_NAME_TOTAL_CPU,
+                        "2.0",
+                        AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY,
+                        "" + MemorySize.ofMebiBytes(1000).getBytes() * 2),
+                flinkService.getClusterInfo(conf));
+    }
+
+    @Test
+    public void effectiveStatusTest() {
+        JobDetails allRunning =
+                getJobDetails(
+                        org.apache.flink.api.common.JobStatus.RUNNING,
+                        Tuple2.of(ExecutionState.RUNNING, 4));
+        assertEquals(
+                org.apache.flink.api.common.JobStatus.RUNNING,
+                AbstractFlinkService.getEffectiveStatus(allRunning));
+
+        JobDetails allRunningOrFinished =
+                getJobDetails(
+                        org.apache.flink.api.common.JobStatus.RUNNING,
+                        Tuple2.of(ExecutionState.RUNNING, 2),
+                        Tuple2.of(ExecutionState.FINISHED, 2));
+        assertEquals(
+                org.apache.flink.api.common.JobStatus.RUNNING,
+                AbstractFlinkService.getEffectiveStatus(allRunningOrFinished));
+
+        JobDetails allRunningOrScheduled =
+                getJobDetails(
+                        org.apache.flink.api.common.JobStatus.RUNNING,
+                        Tuple2.of(ExecutionState.RUNNING, 2),
+                        Tuple2.of(ExecutionState.SCHEDULED, 2));
+        assertEquals(
+                org.apache.flink.api.common.JobStatus.CREATED,
+                
AbstractFlinkService.getEffectiveStatus(allRunningOrScheduled));
+
+        JobDetails allFinished =
+                getJobDetails(
+                        org.apache.flink.api.common.JobStatus.FINISHED,
+                        Tuple2.of(ExecutionState.FINISHED, 4));
+        assertEquals(
+                org.apache.flink.api.common.JobStatus.FINISHED,
+                AbstractFlinkService.getEffectiveStatus(allFinished));
+    }
+
+    private JobDetails getJobDetails(
+            org.apache.flink.api.common.JobStatus status,
+            Tuple2<ExecutionState, Integer>... tasksPerState) {
+        int[] countPerState = new int[ExecutionState.values().length];
+        for (var taskPerState : tasksPerState) {
+            countPerState[taskPerState.f0.ordinal()] = taskPerState.f1;
+        }
+        int numTasks = Arrays.stream(countPerState).sum();
+        return new JobDetails(
+                new JobID(),
+                "test-job",
+                System.currentTimeMillis(),
+                -1,
+                0,
+                status,
+                System.currentTimeMillis(),
+                countPerState,
+                numTasks);
+    }
+
+    @Test
+    public void isJobManagerReadyTest() throws Exception {
+        AtomicReference<String> url = new AtomicReference<>();
+        var clusterClient =
+                new TestingClusterClient<String>(configuration) {
+                    @Override
+                    public String getWebInterfaceURL() {
+                        return url.get();
+                    }
+                };
+        var flinkService = new TestingService(clusterClient);
+
+        assertThrows(
+                FlinkRuntimeException.class,
+                () -> flinkService.isJobManagerPortReady(configuration));
+
+        int port = 6868;
+        url.set("http://127.0.0.1:"; + port);
+
+        assertFalse(flinkService.isJobManagerPortReady(configuration));
+        try (var socket = new ServerSocket(port)) {
+            assertTrue(flinkService.isJobManagerPortReady(configuration));
+        }
+    }
+
+    class TestingService extends AbstractFlinkService {
+
+        RestClusterClient<String> clusterClient;
+        RestClient restClient;
+        List<ObjectMeta> deleted = new ArrayList<>();
+
+        Map<Tuple2<String, String>, PodList> jmPods = new HashMap<>();
+        Map<Tuple2<String, String>, PodList> tmPods = new HashMap<>();
+
+        TestingService(RestClusterClient<String> clusterClient) {
+            this(clusterClient, null);
+        }
+
+        TestingService(RestClusterClient<String> clusterClient, RestClient 
restClient) {
+            super(
+                    client,
+                    AbstractFlinkServiceTest.this.artifactManager,
+                    AbstractFlinkServiceTest.this.executorService,
+                    AbstractFlinkServiceTest.this.operatorConfig);
+            this.clusterClient = clusterClient;
+            this.restClient = restClient;
+        }
+
+        @Override
+        public RestClusterClient<String> getClusterClient(Configuration 
config) {
+            return clusterClient;
+        }
+
+        @Override
+        protected RestClient getRestClient(Configuration conf) throws 
ConfigurationException {
+            return restClient;
+        }
+
+        @Override
+        protected PodList getJmPodList(String namespace, String clusterId) {
+            return jmPods.getOrDefault(Tuple2.of(namespace, clusterId), new 
PodList());
+        }
+
+        @Override
+        protected PodList getTmPodList(String namespace, String clusterId) {
+            return tmPods.getOrDefault(Tuple2.of(namespace, clusterId), new 
PodList());
+        }
+
+        @Override
+        protected void deployApplicationCluster(JobSpec jobSpec, Configuration 
conf) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void submitSessionCluster(Configuration conf) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void cancelJob(
+                FlinkDeployment deployment, UpgradeMode upgradeMode, 
Configuration conf) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean scale(FlinkResourceContext<?> resourceContext) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean scalingCompleted(FlinkResourceContext<?> 
resourceContext) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        protected void deleteClusterInternal(
+                ObjectMeta meta,
+                Configuration conf,
+                boolean deleteHaData,
+                DeletionPropagation deletionPropagation) {
+            deleted.add(meta);
+        }
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
index bb78814c..fe2d1944 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
@@ -18,62 +18,41 @@
 package org.apache.flink.kubernetes.operator.service;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.client.program.rest.RestClusterClient;
-import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.PipelineOptions;
-import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingClusterClient;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
-import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
-import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
-import 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
-import org.apache.flink.kubernetes.operator.api.status.JobStatus;
-import org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentContext;
-import org.apache.flink.kubernetes.operator.exception.RecoveryFailureException;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.EventCollector;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobPlanInfo;
-import org.apache.flink.runtime.rest.messages.TriggerId;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody;
+import 
org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
-import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
-import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
-import org.apache.flink.runtime.rest.util.RestMapperUtils;
-import org.apache.flink.runtime.webmonitor.handlers.JarRunRequestBody;
-import org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody;
 import org.apache.flink.util.concurrent.Executors;
 
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-
 import io.fabric8.kubernetes.api.model.DeletionPropagation;
-import io.fabric8.kubernetes.api.model.ObjectMeta;
-import io.fabric8.kubernetes.api.model.apps.Deployment;
 import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
@@ -83,18 +62,16 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 
 import static 
org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT;
-import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -126,93 +103,21 @@ public class NativeFlinkServiceTest {
     }
 
     @Test
-    public void testCancelJobWithStatelessUpgradeMode() throws Exception {
-        final TestingClusterClient<String> testingClusterClient =
-                new TestingClusterClient<>(configuration, 
TestUtils.TEST_DEPLOYMENT_NAME);
-        final CompletableFuture<JobID> cancelFuture = new 
CompletableFuture<>();
-        testingClusterClient.setCancelFunction(
-                jobID -> {
-                    cancelFuture.complete(jobID);
-                    return 
CompletableFuture.completedFuture(Acknowledge.get());
-                });
-
-        final FlinkService flinkService = 
createFlinkService(testingClusterClient);
-
-        JobID jobID = JobID.generate();
-        FlinkDeployment deployment = TestUtils.buildApplicationCluster();
-        JobStatus jobStatus = deployment.getStatus().getJobStatus();
-        jobStatus.setJobId(jobID.toHexString());
-        ReconciliationUtils.updateStatusForDeployedSpec(deployment, new 
Configuration());
-
-        
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
-        deployment.getStatus().getJobStatus().setState("RUNNING");
-        flinkService.cancelJob(
-                deployment, UpgradeMode.STATELESS, 
configManager.getObserveConfig(deployment));
-        assertTrue(cancelFuture.isDone());
-        assertEquals(jobID, cancelFuture.get());
-        assertNull(jobStatus.getSavepointInfo().getLastSavepoint());
-    }
-
-    @ParameterizedTest
-    
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
-    public void testCancelJobWithSavepointUpgradeMode(FlinkVersion 
flinkVersion) throws Exception {
-        final TestingClusterClient<String> testingClusterClient =
-                new TestingClusterClient<>(configuration, 
TestUtils.TEST_DEPLOYMENT_NAME);
-        final CompletableFuture<Tuple3<JobID, Boolean, String>> 
stopWithSavepointFuture =
-                new CompletableFuture<>();
-        final String savepointPath = "file:///path/of/svp-1";
-        configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointPath);
-        testingClusterClient.setStopWithSavepointFunction(
-                (jobID, advanceToEndOfEventTime, savepointDir) -> {
-                    stopWithSavepointFuture.complete(
-                            new Tuple3<>(jobID, advanceToEndOfEventTime, 
savepointDir));
-                    return CompletableFuture.completedFuture(savepointPath);
-                });
-
-        final FlinkService flinkService = 
createFlinkService(testingClusterClient);
-
-        JobID jobID = JobID.generate();
-        FlinkDeployment deployment = TestUtils.buildApplicationCluster();
-        deployment
-                .getSpec()
-                .getFlinkConfiguration()
-                .put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), 
savepointPath);
-        
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
-        JobStatus jobStatus = deployment.getStatus().getJobStatus();
-        jobStatus.setJobId(jobID.toHexString());
-        
jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
-        ReconciliationUtils.updateStatusForDeployedSpec(deployment, new 
Configuration());
-
-        deployment.getSpec().setFlinkVersion(flinkVersion);
-        flinkService.cancelJob(
-                deployment, UpgradeMode.SAVEPOINT, 
configManager.getObserveConfig(deployment));
-        assertTrue(stopWithSavepointFuture.isDone());
-        assertEquals(jobID, stopWithSavepointFuture.get().f0);
-        assertFalse(stopWithSavepointFuture.get().f1);
-        assertEquals(savepointPath, stopWithSavepointFuture.get().f2);
-        assertEquals(savepointPath, 
jobStatus.getSavepointInfo().getLastSavepoint().getLocation());
-
-        assertEquals(jobStatus.getState(), 
org.apache.flink.api.common.JobStatus.FINISHED.name());
-        if (flinkVersion.isNewerVersionThan(FlinkVersion.v1_14)) {
-            assertEquals(
-                    deployment.getStatus().getJobManagerDeploymentStatus(),
-                    JobManagerDeploymentStatus.READY);
-        } else {
-            assertEquals(
-                    deployment.getStatus().getJobManagerDeploymentStatus(),
-                    JobManagerDeploymentStatus.MISSING);
-        }
-    }
-
-    @Test
-    public void testCancelJobWithLastStateUpgradeMode() throws Exception {
-        FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+    public void testDeleteClusterInternal() {
+        var deployment = TestUtils.buildApplicationCluster();
         ReconciliationUtils.updateStatusForDeployedSpec(deployment, new 
Configuration());
-        final TestingClusterClient<String> testingClusterClient =
-                new TestingClusterClient<>(configuration, 
TestUtils.TEST_DEPLOYMENT_NAME);
-        final FlinkService flinkService = 
createFlinkService(testingClusterClient);
+        var flinkService = createFlinkService(null);
 
-        client.resource(createTestingDeployment()).create();
+        var dep =
+                new DeploymentBuilder()
+                        .withNewMetadata()
+                        .withName(TestUtils.TEST_DEPLOYMENT_NAME)
+                        .withNamespace(TestUtils.TEST_NAMESPACE)
+                        .endMetadata()
+                        .withNewSpec()
+                        .endSpec()
+                        .build();
+        client.resource(dep).create();
 
         assertNotNull(
                 client.apps()
@@ -221,13 +126,11 @@ public class NativeFlinkServiceTest {
                         .withName(TestUtils.TEST_DEPLOYMENT_NAME)
                         .get());
 
-        JobID jobID = JobID.generate();
-        JobStatus jobStatus = deployment.getStatus().getJobStatus();
-        jobStatus.setJobId(jobID.toHexString());
-
-        flinkService.cancelJob(
-                deployment, UpgradeMode.LAST_STATE, 
configManager.getObserveConfig(deployment));
-        assertNull(jobStatus.getSavepointInfo().getLastSavepoint());
+        flinkService.deleteClusterInternal(
+                deployment.getMetadata(),
+                configManager.getObserveConfig(deployment),
+                false,
+                DeletionPropagation.FOREGROUND);
         assertNull(
                 client.apps()
                         .deployments()
@@ -236,151 +139,31 @@ public class NativeFlinkServiceTest {
                         .get());
     }
 
-    @Test
-    public void testTriggerSavepoint() throws Exception {
-        final TestingClusterClient<String> testingClusterClient =
-                new TestingClusterClient<>(configuration, 
TestUtils.TEST_DEPLOYMENT_NAME);
-        final CompletableFuture<Tuple3<JobID, String, Boolean>> 
triggerSavepointFuture =
-                new CompletableFuture<>();
-        final String savepointPath = "file:///path/of/svp";
-        configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointPath);
-        testingClusterClient.setRequestProcessor(
-                (headers, parameters, requestBody) -> {
-                    triggerSavepointFuture.complete(
-                            new Tuple3<>(
-                                    ((SavepointTriggerMessageParameters) 
parameters)
-                                            .jobID.getValue(),
-                                    ((SavepointTriggerRequestBody) requestBody)
-                                            .getTargetDirectory()
-                                            .get(),
-                                    ((SavepointTriggerRequestBody) 
requestBody).isCancelJob()));
-                    return CompletableFuture.completedFuture(new 
TriggerResponse(new TriggerId()));
-                });
-
-        final FlinkService flinkService = 
createFlinkService(testingClusterClient);
-
-        final JobID jobID = JobID.generate();
-        final FlinkDeployment flinkDeployment = 
TestUtils.buildApplicationCluster();
-        ReconciliationUtils.updateStatusForDeployedSpec(flinkDeployment, new 
Configuration());
-        JobStatus jobStatus = new JobStatus();
-        jobStatus.setJobId(jobID.toString());
-        flinkDeployment.getStatus().setJobStatus(jobStatus);
-        flinkService.triggerSavepoint(
-                flinkDeployment.getStatus().getJobStatus().getJobId(),
-                SavepointTriggerType.MANUAL,
-                flinkDeployment.getStatus().getJobStatus().getSavepointInfo(),
-                configuration);
-        assertTrue(triggerSavepointFuture.isDone());
-        assertEquals(jobID, triggerSavepointFuture.get().f0);
-        assertEquals(savepointPath, triggerSavepointFuture.get().f1);
-        assertFalse(triggerSavepointFuture.get().f2);
-    }
-
-    @Test
-    public void testGetLastCheckpoint() throws Exception {
-        ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
-        var testingClusterClient =
-                new TestingClusterClient<>(configuration, 
TestUtils.TEST_DEPLOYMENT_NAME);
-
-        String responseWithHistory =
-                
"{\"counts\":{\"restored\":1,\"total\":79,\"in_progress\":0,\"completed\":69,\"failed\":10},\"summary\":{\"checkpointed_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"state_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"end_to_end_duration\":{\"min\":14,\"max\":117,\"avg\":24,\"p50\":22,\"p90\":32,\"p95\":40.5,\"p99\":11
 [...]
-        String responseWithoutHistory =
-                
"{\"counts\":{\"restored\":1,\"total\":79,\"in_progress\":0,\"completed\":69,\"failed\":10},\"summary\":{\"checkpointed_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"state_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"end_to_end_duration\":{\"min\":14,\"max\":117,\"avg\":24,\"p50\":22,\"p90\":32,\"p95\":40.5,\"p99\":11
 [...]
-        String responseWithoutHistoryInternal =
-                
"{\"counts\":{\"restored\":1,\"total\":79,\"in_progress\":0,\"completed\":69,\"failed\":10},\"summary\":{\"checkpointed_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"state_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"end_to_end_duration\":{\"min\":14,\"max\":117,\"avg\":24,\"p50\":22,\"p90\":32,\"p95\":40.5,\"p99\":11
 [...]
-
-        var responseContainer = new ArrayList<CheckpointHistoryWrapper>();
-
-        testingClusterClient.setRequestProcessor(
-                (headers, parameters, requestBody) -> {
-                    if (headers instanceof 
CustomCheckpointingStatisticsHeaders) {
-                        return 
CompletableFuture.completedFuture(responseContainer.get(0));
+    @ParameterizedTest
+    
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
+    public void testDeleteOnSavepointBefore1_15(FlinkVersion flinkVersion) 
throws Exception {
+        AtomicBoolean tested = new AtomicBoolean(false);
+        var flinkService =
+                new NativeFlinkService(
+                        client, null, executorService, operatorConfig, 
eventRecorder) {
+                    @Override
+                    protected void cancelJob(
+                            FlinkDeployment deployment,
+                            UpgradeMode upgradeMode,
+                            Configuration conf,
+                            boolean deleteClusterAfterSavepoint) {
+                        assertEquals(
+                                
flinkVersion.isNewerVersionThan(FlinkVersion.v1_14) ? false : true,
+                                deleteClusterAfterSavepoint);
+                        tested.set(true);
                     }
-                    fail("unknown request");
-                    return null;
-                });
-
-        var flinkService = createFlinkService(testingClusterClient);
-
-        responseContainer.add(
-                objectMapper.readValue(responseWithHistory, 
CheckpointHistoryWrapper.class));
-        var checkpointOpt = flinkService.getLastCheckpoint(new JobID(), new 
Configuration());
-        assertEquals(
-                
"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-96",
-                checkpointOpt.get().getLocation());
-
-        responseContainer.set(
-                0, objectMapper.readValue(responseWithoutHistory, 
CheckpointHistoryWrapper.class));
-        checkpointOpt = flinkService.getLastCheckpoint(new JobID(), new 
Configuration());
-        assertEquals(
-                "file:/flink-data/savepoints/savepoint-000000-5930e5326ca7",
-                checkpointOpt.get().getLocation());
-
-        responseContainer.set(
-                0,
-                objectMapper.readValue(
-                        responseWithoutHistoryInternal, 
CheckpointHistoryWrapper.class));
-        try {
-            flinkService.getLastCheckpoint(new JobID(), new Configuration());
-            fail();
-        } catch (RecoveryFailureException dpe) {
-
-        }
-    }
-
-    @Test
-    public void testGetLastSavepointRestCompatibility() throws 
JsonProcessingException {
-        ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
-        String flink14Response =
-                
"{\"counts\":{\"restored\":0,\"total\":2,\"in_progress\":0,\"completed\":2,\"failed\":0},\"summary\":{\"state_size\":{\"min\":8646,\"max\":25626,\"avg\":17136},\"end_to_end_duration\":{\"min\":95,\"max\":420,\"avg\":257},\"alignment_buffered\":{\"min\":0,\"max\":0,\"avg\":0},\"processed_data\":{\"min\":0,\"max\":70,\"avg\":35},\"persisted_data\":{\"min\":0,\"max\":0,\"avg\":0}},\"latest\":{\"completed\":{\"@class\":\"completed\",\"id\":1,\"status\":\"COMPLETED\",\"is_save
 [...]
-        String flink15Response =
-                
"{\"counts\":{\"restored\":0,\"total\":12,\"in_progress\":0,\"completed\":3,\"failed\":9},\"summary\":{\"checkpointed_size\":{\"min\":4308,\"max\":16053,\"avg\":11856,\"p50\":15207,\"p90\":16053,\"p95\":16053,\"p99\":16053,\"p999\":16053},\"state_size\":{\"min\":4308,\"max\":16053,\"avg\":11856,\"p50\":15207,\"p90\":16053,\"p95\":16053,\"p99\":16053,\"p999\":16053},\"end_to_end_duration\":{\"min\":31,\"max\":117,\"avg\":61,\"p50\":36,\"p90\":117,\"p95\":117,\"p99\":117,\"
 [...]
-
-        objectMapper.readValue(flink14Response, 
CheckpointHistoryWrapper.class);
-        objectMapper.readValue(flink15Response, 
CheckpointHistoryWrapper.class);
-    }
-
-    @Test
-    public void testGetInProgressCheckpointsFromResponseWithoutHistoryDetails()
-            throws JsonProcessingException {
-        ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
-        String response =
-                
"{\"counts\":{\"restored\":0,\"total\":2,\"in_progress\":0,\"completed\":2,\"failed\":0}}";
-        var checkpointHistoryWrapper =
-                objectMapper.readValue(response, 
CheckpointHistoryWrapper.class);
-        Optional<CheckpointHistoryWrapper.PendingCheckpointInfo> 
optionalPendingCheckpointInfo =
-                
assertDoesNotThrow(checkpointHistoryWrapper::getInProgressCheckpoint);
-        assertTrue(optionalPendingCheckpointInfo.isEmpty());
-    }
-
-    @Test
-    public void testGetInProgressCheckpointsWithoutHistory() {
-        CheckpointHistoryWrapper checkpointHistoryWrapper = new 
CheckpointHistoryWrapper();
-        Optional<CheckpointHistoryWrapper.PendingCheckpointInfo> 
optionalPendingCheckpointInfo =
-                
assertDoesNotThrow(checkpointHistoryWrapper::getInProgressCheckpoint);
-        assertTrue(optionalPendingCheckpointInfo.isEmpty());
-    }
-
-    @Test
-    public void testClusterInfoRestCompatibility() throws 
JsonProcessingException {
-        ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
-
-        String flink13Response =
-                "{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated 
Universal 
Time\",\"timezone-offset\":0,\"flink-version\":\"1.13.6\",\"flink-revision\":\"b2ca390
 @ 2022-02-03T14:54:22+01:00\",\"features\":{\"web-submit\":false}}";
-        String flink14Response =
-                "{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated 
Universal 
Time\",\"timezone-offset\":0,\"flink-version\":\"1.14.4\",\"flink-revision\":\"895c609
 @ 
2022-02-25T11:57:14+01:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}";
-
-        var dashboardConfiguration =
-                objectMapper.readValue(flink13Response, 
CustomDashboardConfiguration.class);
-        dashboardConfiguration =
-                objectMapper.readValue(flink14Response, 
CustomDashboardConfiguration.class);
-    }
+                };
 
-    @Test
-    public void testRemoveOperatorConfig() {
-        Configuration deployConfig = createOperatorConfig();
-        Configuration newConf = 
AbstractFlinkService.removeOperatorConfigs(deployConfig);
-        assertFalse(newConf.containsKey(OPERATOR_HEALTH_PROBE_PORT.key()));
+        flinkService.cancelJob(
+                TestUtils.buildApplicationCluster(flinkVersion),
+                UpgradeMode.SAVEPOINT,
+                new Configuration());
+        assertTrue(tested.get());
     }
 
     @Test
@@ -409,209 +192,6 @@ public class NativeFlinkServiceTest {
                 
testingService.getRuntimeConfig().containsKey(OPERATOR_HEALTH_PROBE_PORT.key()));
     }
 
-    @Test
-    public void testEffectiveStatus() {
-
-        JobDetails allRunning =
-                getJobDetails(
-                        org.apache.flink.api.common.JobStatus.RUNNING,
-                        Tuple2.of(ExecutionState.RUNNING, 4));
-        assertEquals(
-                org.apache.flink.api.common.JobStatus.RUNNING,
-                AbstractFlinkService.getEffectiveStatus(allRunning));
-
-        JobDetails allRunningOrFinished =
-                getJobDetails(
-                        org.apache.flink.api.common.JobStatus.RUNNING,
-                        Tuple2.of(ExecutionState.RUNNING, 2),
-                        Tuple2.of(ExecutionState.FINISHED, 2));
-        assertEquals(
-                org.apache.flink.api.common.JobStatus.RUNNING,
-                AbstractFlinkService.getEffectiveStatus(allRunningOrFinished));
-
-        JobDetails allRunningOrScheduled =
-                getJobDetails(
-                        org.apache.flink.api.common.JobStatus.RUNNING,
-                        Tuple2.of(ExecutionState.RUNNING, 2),
-                        Tuple2.of(ExecutionState.SCHEDULED, 2));
-        assertEquals(
-                org.apache.flink.api.common.JobStatus.CREATED,
-                
AbstractFlinkService.getEffectiveStatus(allRunningOrScheduled));
-
-        JobDetails allFinished =
-                getJobDetails(
-                        org.apache.flink.api.common.JobStatus.FINISHED,
-                        Tuple2.of(ExecutionState.FINISHED, 4));
-        assertEquals(
-                org.apache.flink.api.common.JobStatus.FINISHED,
-                AbstractFlinkService.getEffectiveStatus(allFinished));
-    }
-
-    @Test
-    public void testNativeSavepointFormat() throws Exception {
-        final TestingClusterClient<String> testingClusterClient =
-                new TestingClusterClient<>(configuration, 
TestUtils.TEST_DEPLOYMENT_NAME);
-        final String savepointPath = "file:///path/of/svp";
-        final CompletableFuture<Tuple4<JobID, String, Boolean, 
SavepointFormatType>>
-                triggerSavepointFuture = new CompletableFuture<>();
-        configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointPath);
-        testingClusterClient.setRequestProcessor(
-                (headers, parameters, requestBody) -> {
-                    triggerSavepointFuture.complete(
-                            new Tuple4<>(
-                                    ((SavepointTriggerMessageParameters) 
parameters)
-                                            .jobID.getValue(),
-                                    ((SavepointTriggerRequestBody) requestBody)
-                                            .getTargetDirectory()
-                                            .get(),
-                                    ((SavepointTriggerRequestBody) 
requestBody).isCancelJob(),
-                                    ((SavepointTriggerRequestBody) 
requestBody).getFormatType()));
-                    return CompletableFuture.completedFuture(new 
TriggerResponse(new TriggerId()));
-                });
-        final CompletableFuture<Tuple3<JobID, SavepointFormatType, String>>
-                stopWithSavepointFuture = new CompletableFuture<>();
-        testingClusterClient.setStopWithSavepointFormat(
-                (id, formatType, savepointDir) -> {
-                    stopWithSavepointFuture.complete(new Tuple3<>(id, 
formatType, savepointDir));
-                    return CompletableFuture.completedFuture(savepointPath);
-                });
-
-        final FlinkService flinkService = 
createFlinkService(testingClusterClient);
-
-        final JobID jobID = JobID.generate();
-        final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
-        deployment
-                .getSpec()
-                .getFlinkConfiguration()
-                .put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), 
savepointPath);
-        
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
-        JobStatus jobStatus = deployment.getStatus().getJobStatus();
-        jobStatus.setJobId(jobID.toHexString());
-        
jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
-        ReconciliationUtils.updateStatusForDeployedSpec(deployment, new 
Configuration());
-
-        jobStatus.setJobId(jobID.toString());
-        deployment.getStatus().setJobStatus(jobStatus);
-        flinkService.triggerSavepoint(
-                deployment.getStatus().getJobStatus().getJobId(),
-                SavepointTriggerType.MANUAL,
-                deployment.getStatus().getJobStatus().getSavepointInfo(),
-                new Configuration(configuration)
-                        .set(OPERATOR_SAVEPOINT_FORMAT_TYPE, 
SavepointFormatType.NATIVE));
-        assertTrue(triggerSavepointFuture.isDone());
-        assertEquals(jobID, triggerSavepointFuture.get().f0);
-        assertEquals(savepointPath, triggerSavepointFuture.get().f1);
-        assertFalse(triggerSavepointFuture.get().f2);
-        assertEquals(SavepointFormatType.NATIVE, 
triggerSavepointFuture.get().f3);
-
-        flinkService.cancelJob(
-                deployment,
-                UpgradeMode.SAVEPOINT,
-                new Configuration(configManager.getObserveConfig(deployment))
-                        .set(OPERATOR_SAVEPOINT_FORMAT_TYPE, 
SavepointFormatType.NATIVE));
-        assertTrue(stopWithSavepointFuture.isDone());
-        assertEquals(jobID, stopWithSavepointFuture.get().f0);
-        assertEquals(SavepointFormatType.NATIVE, 
stopWithSavepointFuture.get().f1);
-        assertEquals(savepointPath, stopWithSavepointFuture.get().f2);
-    }
-
-    @Test
-    public void testDeletionPropagation() {
-        var propagation = new ArrayList<DeletionPropagation>();
-        NativeFlinkService flinkService =
-                new NativeFlinkService(
-                        client, null, executorService, operatorConfig, 
eventRecorder) {
-                    @Override
-                    protected void deleteClusterInternal(
-                            ObjectMeta meta,
-                            Configuration conf,
-                            boolean deleteHaData,
-                            DeletionPropagation deletionPropagation) {
-                        propagation.add(deletionPropagation);
-                    }
-                };
-
-        flinkService.deleteClusterDeployment(
-                new ObjectMeta(), new FlinkDeploymentStatus(), configuration, 
true);
-        assertEquals(DeletionPropagation.FOREGROUND, propagation.get(0));
-
-        configuration.set(
-                KubernetesOperatorConfigOptions.RESOURCE_DELETION_PROPAGATION,
-                DeletionPropagation.BACKGROUND);
-
-        flinkService =
-                new NativeFlinkService(
-                        client,
-                        null,
-                        executorService,
-                        
FlinkOperatorConfiguration.fromConfiguration(configuration),
-                        eventRecorder) {
-                    @Override
-                    protected void deleteClusterInternal(
-                            ObjectMeta meta,
-                            Configuration conf,
-                            boolean deleteHaData,
-                            DeletionPropagation deletionPropagation) {
-                        propagation.add(deletionPropagation);
-                    }
-                };
-        flinkService.deleteClusterDeployment(
-                new ObjectMeta(), new FlinkDeploymentStatus(), configuration, 
true);
-        assertEquals(DeletionPropagation.BACKGROUND, propagation.get(1));
-    }
-
-    @Test
-    public void testSendConfigOnRunJar() throws Exception {
-        var jarRuns = new ArrayList<JarRunRequestBody>();
-        var flinkService =
-                new NativeFlinkService(
-                        client, null, executorService, operatorConfig, 
eventRecorder) {
-                    @Override
-                    public RestClusterClient<String> 
getClusterClient(Configuration conf)
-                            throws Exception {
-                        var client = new TestingClusterClient<String>(conf);
-                        client.setRequestProcessor(
-                                (h, p, b) -> {
-                                    jarRuns.add((JarRunRequestBody) b);
-                                    return 
CompletableFuture.completedFuture(null);
-                                });
-                        return client;
-                    }
-
-                    @Override
-                    protected JarUploadResponseBody uploadJar(
-                            ObjectMeta objectMeta, FlinkSessionJobSpec spec, 
Configuration conf) {
-                        return new JarUploadResponseBody("test");
-                    }
-
-                    @Override
-                    protected void deleteJar(Configuration conf, String jarId) 
{}
-                };
-
-        var session = TestUtils.buildSessionCluster();
-        session.getSpec().setFlinkVersion(FlinkVersion.v1_17);
-        session.getStatus()
-                .getReconciliationStatus()
-                .serializeAndSetLastReconciledSpec(session.getSpec(), session);
-
-        var job = TestUtils.buildSessionJob();
-        var deployConf = configManager.getSessionJobConfig(session, 
job.getSpec());
-        flinkService.submitJobToSessionCluster(job.getMetadata(), 
job.getSpec(), deployConf, null);
-
-        // Make sure that deploy conf was passed to jar run
-        assertEquals(deployConf.toMap(), 
jarRuns.get(0).getFlinkConfiguration().toMap());
-
-        session.getSpec().setFlinkVersion(FlinkVersion.v1_16);
-        session.getStatus()
-                .getReconciliationStatus()
-                .serializeAndSetLastReconciledSpec(session.getSpec(), session);
-
-        deployConf = configManager.getSessionJobConfig(session, job.getSpec());
-        flinkService.submitJobToSessionCluster(job.getMetadata(), 
job.getSpec(), deployConf, null);
-
-        assertTrue(jarRuns.get(1).getFlinkConfiguration().toMap().isEmpty());
-    }
-
     @Test
     public void testScaling() throws Exception {
         var v1 = new JobVertexID();
@@ -916,6 +496,47 @@ public class NativeFlinkServiceTest {
         assertTrue(service.scalingCompleted(ctx));
     }
 
+    @Test
+    public void resourceRestApiTest() throws Exception {
+        var testingClusterClient = new 
TestingClusterClient<String>(configuration);
+        var service = (NativeFlinkService) 
createFlinkService(testingClusterClient);
+        var jobId = new JobID();
+
+        var reqs =
+                new JobResourceRequirements(
+                        Map.of(
+                                new JobVertexID(),
+                                new JobVertexResourceRequirements(
+                                        new 
JobVertexResourceRequirements.Parallelism(0, 2))));
+
+        testingClusterClient.setRequestProcessor(
+                (h, p, r) -> {
+                    if (h instanceof JobResourceRequirementsHeaders) {
+                        if (jobId.equals(((JobMessageParameters) 
p).jobPathParameter.getValue())) {
+                            return CompletableFuture.completedFuture(
+                                    new JobResourceRequirementsBody(reqs));
+                        }
+                    } else if (r instanceof JobResourceRequirementsBody) {
+                        if (jobId.equals(((JobMessageParameters) 
p).jobPathParameter.getValue())) {
+                            assertEquals(
+                                    Optional.of(reqs),
+                                    ((JobResourceRequirementsBody) 
r).asJobResourceRequirements());
+                            return CompletableFuture.completedFuture(null);
+                        }
+                    }
+                    fail("unknown request");
+                    return null;
+                });
+
+        var deployment = TestUtils.buildApplicationCluster();
+        deployment.getStatus().getJobStatus().setJobId(jobId.toString());
+        assertEquals(
+                reqs.getJobVertexParallelisms(),
+                service.getVertexResources(testingClusterClient, deployment));
+        service.updateVertexResources(
+                testingClusterClient, deployment, 
reqs.getJobVertexParallelisms());
+    }
+
     public static JobDetailsInfo createJobDetailsFor(
             List<JobDetailsInfo.JobVertexDetailsInfo> vertexInfos) {
         return new JobDetailsInfo(
@@ -961,27 +582,7 @@ public class NativeFlinkServiceTest {
         }
     }
 
-    private JobDetails getJobDetails(
-            org.apache.flink.api.common.JobStatus status,
-            Tuple2<ExecutionState, Integer>... tasksPerState) {
-        int[] countPerState = new int[ExecutionState.values().length];
-        for (var taskPerState : tasksPerState) {
-            countPerState[taskPerState.f0.ordinal()] = taskPerState.f1;
-        }
-        int numTasks = Arrays.stream(countPerState).sum();
-        return new JobDetails(
-                new JobID(),
-                "test-job",
-                System.currentTimeMillis(),
-                -1,
-                0,
-                status,
-                System.currentTimeMillis(),
-                countPerState,
-                numTasks);
-    }
-
-    private FlinkService createFlinkService(RestClusterClient<String> 
clusterClient) {
+    private AbstractFlinkService createFlinkService(RestClusterClient<String> 
clusterClient) {
         return new NativeFlinkService(
                 client, null, executorService, operatorConfig, eventRecorder) {
             @Override
@@ -991,17 +592,6 @@ public class NativeFlinkServiceTest {
         };
     }
 
-    private Deployment createTestingDeployment() {
-        return new DeploymentBuilder()
-                .withNewMetadata()
-                .withName(TestUtils.TEST_DEPLOYMENT_NAME)
-                .withNamespace(TestUtils.TEST_NAMESPACE)
-                .endMetadata()
-                .withNewSpec()
-                .endSpec()
-                .build();
-    }
-
     private Configuration createOperatorConfig() {
         Map<String, String> configMap = 
Map.of(OPERATOR_HEALTH_PROBE_PORT.key(), "80");
         Configuration deployConfig = Configuration.fromMap(configMap);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/RestResponseTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/RestResponseTest.java
new file mode 100644
index 00000000..a0ffd6e2
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/RestResponseTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.service;
+
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for custom rest response classes. */
+public class RestResponseTest {
+
+    private ObjectMapper objectMapper = 
RestMapperUtils.getStrictObjectMapper();
+
+    @Test
+    public void testGetLastSavepointRestCompatibility() {
+        String flink14Response =
+                
"{\"counts\":{\"restored\":0,\"total\":2,\"in_progress\":0,\"completed\":2,\"failed\":0},\"summary\":{\"state_size\":{\"min\":8646,\"max\":25626,\"avg\":17136},\"end_to_end_duration\":{\"min\":95,\"max\":420,\"avg\":257},\"alignment_buffered\":{\"min\":0,\"max\":0,\"avg\":0},\"processed_data\":{\"min\":0,\"max\":70,\"avg\":35},\"persisted_data\":{\"min\":0,\"max\":0,\"avg\":0}},\"latest\":{\"completed\":{\"@class\":\"completed\",\"id\":1,\"status\":\"COMPLETED\",\"is_save
 [...]
+        String flink15Response =
+                
"{\"counts\":{\"restored\":0,\"total\":12,\"in_progress\":0,\"completed\":3,\"failed\":9},\"summary\":{\"checkpointed_size\":{\"min\":4308,\"max\":16053,\"avg\":11856,\"p50\":15207,\"p90\":16053,\"p95\":16053,\"p99\":16053,\"p999\":16053},\"state_size\":{\"min\":4308,\"max\":16053,\"avg\":11856,\"p50\":15207,\"p90\":16053,\"p95\":16053,\"p99\":16053,\"p999\":16053},\"end_to_end_duration\":{\"min\":31,\"max\":117,\"avg\":61,\"p50\":36,\"p90\":117,\"p95\":117,\"p99\":117,\"
 [...]
+
+        assertDoesNotThrow(
+                () -> objectMapper.readValue(flink14Response, 
CheckpointHistoryWrapper.class));
+        assertDoesNotThrow(
+                () -> objectMapper.readValue(flink15Response, 
CheckpointHistoryWrapper.class));
+    }
+
+    @Test
+    public void testGetInProgressCheckpointsFromResponseWithoutHistoryDetails()
+            throws JsonProcessingException {
+        String response =
+                
"{\"counts\":{\"restored\":0,\"total\":2,\"in_progress\":0,\"completed\":2,\"failed\":0}}";
+        var checkpointHistoryWrapper =
+                objectMapper.readValue(response, 
CheckpointHistoryWrapper.class);
+        Optional<CheckpointHistoryWrapper.PendingCheckpointInfo> 
optionalPendingCheckpointInfo =
+                
assertDoesNotThrow(checkpointHistoryWrapper::getInProgressCheckpoint);
+        assertTrue(optionalPendingCheckpointInfo.isEmpty());
+    }
+
+    @Test
+    public void testGetInProgressCheckpointsWithoutHistory() {
+        CheckpointHistoryWrapper checkpointHistoryWrapper = new 
CheckpointHistoryWrapper();
+        Optional<CheckpointHistoryWrapper.PendingCheckpointInfo> 
optionalPendingCheckpointInfo =
+                
assertDoesNotThrow(checkpointHistoryWrapper::getInProgressCheckpoint);
+        assertTrue(optionalPendingCheckpointInfo.isEmpty());
+    }
+
+    @Test
+    public void testClusterInfoRestCompatibility() {
+        String flink13Response =
+                "{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated 
Universal 
Time\",\"timezone-offset\":0,\"flink-version\":\"1.13.6\",\"flink-revision\":\"b2ca390
 @ 2022-02-03T14:54:22+01:00\",\"features\":{\"web-submit\":false}}";
+        String flink14Response =
+                "{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated 
Universal 
Time\",\"timezone-offset\":0,\"flink-version\":\"1.14.4\",\"flink-revision\":\"895c609
 @ 
2022-02-25T11:57:14+01:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}";
+
+        assertDoesNotThrow(
+                () -> objectMapper.readValue(flink13Response, 
CustomDashboardConfiguration.class));
+        assertDoesNotThrow(
+                () -> objectMapper.readValue(flink14Response, 
CustomDashboardConfiguration.class));
+    }
+}

Reply via email to