This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit b545bf35451b1cbf4337ef36beb30405547e52ea Author: Gyula Fora <[email protected]> AuthorDate: Thu Jul 13 10:23:38 2023 +0200 [hotfix][tests] Improve Flink service tests --- .../operator/service/AbstractFlinkService.java | 22 +- .../flink/kubernetes/operator/TestUtils.java | 5 +- .../kubernetes/operator/TestingClusterClient.java | 57 +- .../kubernetes/operator/TestingRestClient.java | 73 ++ .../operator/service/AbstractFlinkServiceTest.java | 873 +++++++++++++++++++++ .../operator/service/NativeFlinkServiceTest.java | 586 +++----------- .../operator/service/RestResponseTest.java | 83 ++ 7 files changed, 1148 insertions(+), 551 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index 843814f9..88cb42c5 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -79,10 +79,10 @@ import org.apache.flink.runtime.webmonitor.handlers.JarDeleteMessageParameters; import org.apache.flink.runtime.webmonitor.handlers.JarRunHeaders; import org.apache.flink.runtime.webmonitor.handlers.JarRunMessageParameters; import org.apache.flink.runtime.webmonitor.handlers.JarRunRequestBody; -import org.apache.flink.runtime.webmonitor.handlers.JarRunResponseBody; import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders; import org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; +import org.apache.flink.util.ConfigurationException; import org.apache.flink.util.FileUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.FlinkRuntimeException; @@ -220,7 +220,7 @@ public abstract class AbstractFlinkService implements FlinkService { try (var clusterClient = getClusterClient(config)) { uri = URI.create(clusterClient.getWebInterfaceURL()); } catch (Exception ex) { - throw new RuntimeException(ex); + throw new FlinkRuntimeException(ex); } SocketAddress socketAddress = new InetSocketAddress(uri.getHost(), uri.getPort()); Socket socket = new Socket(); @@ -660,7 +660,8 @@ public abstract class AbstractFlinkService implements FlinkService { conf, clusterId, (c, e) -> new StandaloneClientHAServices(restServerAddress)); } - private JarRunResponseBody runJar( + @VisibleForTesting + protected void runJar( JobSpec job, JobID jobID, JarUploadResponseBody response, @@ -688,7 +689,7 @@ public abstract class AbstractFlinkService implements FlinkService { ? conf.toMap() : null); LOG.info("Submitting job: {} to session cluster.", jobID.toHexString()); - return clusterClient + clusterClient .sendRequest(headers, parameters, runRequestBody) .get(operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS); } catch (Exception e) { @@ -716,8 +717,7 @@ public abstract class AbstractFlinkService implements FlinkService { operatorConfig.getFlinkServiceHostOverride(), ExternalServiceDecorator.getNamespacedExternalServiceName( clusterId, namespace)); - try (RestClient restClient = new RestClient(conf, executorService)) { - // TODO add method in flink#RestClusterClient to support upload jar. + try (var restClient = getRestClient(conf)) { return restClient .sendRequest( host, @@ -735,6 +735,11 @@ public abstract class AbstractFlinkService implements FlinkService { } } + @VisibleForTesting + protected RestClient getRestClient(Configuration conf) throws ConfigurationException { + return new RestClient(conf, executorService); + } + private String findJarURI(JobSpec jobSpec) { if (jobSpec.getJarURI() != null) { return jobSpec.getJarURI(); @@ -743,8 +748,7 @@ public abstract class AbstractFlinkService implements FlinkService { } } - @VisibleForTesting - protected void deleteJar(Configuration conf, String jarId) { + private void deleteJar(Configuration conf, String jarId) { LOG.debug("Deleting the jar: {}", jarId); try (var clusterClient = getClusterClient(conf)) { JarDeleteHeaders headers = JarDeleteHeaders.getInstance(); @@ -910,7 +914,7 @@ public abstract class AbstractFlinkService implements FlinkService { } } - public TaskManagersInfo getTaskManagersInfo(Configuration conf) throws Exception { + private TaskManagersInfo getTaskManagersInfo(Configuration conf) throws Exception { try (var clusterClient = getClusterClient(conf)) { return clusterClient .sendRequest( diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java index 79e8f71c..a3b1b101 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java @@ -292,7 +292,10 @@ public class TestUtils extends BaseTestUtils { } public static Stream<Arguments> flinkVersions() { - return Stream.of(arguments(FlinkVersion.v1_14), arguments(FlinkVersion.v1_15)); + return Stream.of( + arguments(FlinkVersion.v1_14), + arguments(FlinkVersion.v1_15), + arguments(FlinkVersion.v1_17)); } public static FlinkDeployment createCanaryDeployment() { diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java index 413a6943..05d7e5cb 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java @@ -39,6 +39,9 @@ import org.apache.flink.runtime.rest.messages.RequestBody; import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.util.function.TriFunction; +import lombok.Getter; +import lombok.Setter; + import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -48,17 +51,24 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.function.Supplier; -/** Testing ClusterClient used implementation. */ +/** Testing ClusterClient implementation. */ public class TestingClusterClient<T> extends RestClusterClient<T> { + @Setter private Function<JobID, CompletableFuture<Acknowledge>> cancelFunction = ignore -> CompletableFuture.completedFuture(Acknowledge.get()); + + @Setter private TriFunction<JobID, Boolean, String, CompletableFuture<String>> stopWithSavepointFunction = (ignore1, ignore2, savepointPath) -> CompletableFuture.completedFuture(savepointPath); + + @Setter private TriFunction<JobID, SavepointFormatType, String, CompletableFuture<String>> stopWithSavepointFormat; + + @Setter private TriFunction< MessageHeaders<?, ?, ?>, MessageParameters, @@ -68,17 +78,19 @@ public class TestingClusterClient<T> extends RestClusterClient<T> { (ignore1, ignore2, ignore) -> CompletableFuture.completedFuture(EmptyResponseBody.getInstance()); + @Setter private Supplier<CompletableFuture<Collection<JobStatusMessage>>> listJobsFunction = () -> { throw new UnsupportedOperationException(); }; + @Setter private Function<JobID, CompletableFuture<JobResult>> requestResultFunction = jobID -> CompletableFuture.completedFuture( new JobResult.Builder().jobId(jobID).netRuntime(1).build()); - private final T clusterId; + @Getter private final T clusterId; public TestingClusterClient(Configuration configuration, T clusterId) throws Exception { super(configuration, clusterId, (c, e) -> new StandaloneClientHAServices("localhost")); @@ -89,47 +101,6 @@ public class TestingClusterClient<T> extends RestClusterClient<T> { this(configuration, (T) configuration.get(KubernetesConfigOptions.CLUSTER_ID)); } - public void setCancelFunction(Function<JobID, CompletableFuture<Acknowledge>> cancelFunction) { - this.cancelFunction = cancelFunction; - } - - public void setStopWithSavepointFunction( - TriFunction<JobID, Boolean, String, CompletableFuture<String>> - stopWithSavepointFunction) { - this.stopWithSavepointFunction = stopWithSavepointFunction; - } - - public void setStopWithSavepointFormat( - TriFunction<JobID, SavepointFormatType, String, CompletableFuture<String>> - stopWithSavepointFormat) { - this.stopWithSavepointFormat = stopWithSavepointFormat; - } - - public void setRequestProcessor( - TriFunction< - MessageHeaders<?, ?, ?>, - MessageParameters, - RequestBody, - CompletableFuture<ResponseBody>> - requestProcessor) { - this.requestProcessor = requestProcessor; - } - - public void setListJobsFunction( - Supplier<CompletableFuture<Collection<JobStatusMessage>>> listJobsFunction) { - this.listJobsFunction = listJobsFunction; - } - - public void setRequestResultFunction( - Function<JobID, CompletableFuture<JobResult>> requestResultFunction) { - this.requestResultFunction = requestResultFunction; - } - - @Override - public T getClusterId() { - return clusterId; - } - @Override public Configuration getFlinkConfiguration() { throw new UnsupportedOperationException(); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingRestClient.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingRestClient.java new file mode 100644 index 00000000..49558859 --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingRestClient.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.rest.FileUpload; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.versioning.RestAPIVersion; +import org.apache.flink.util.ConfigurationException; +import org.apache.flink.util.concurrent.Executors; +import org.apache.flink.util.function.TriFunction; + +import lombok.Setter; + +import java.util.Collection; +import java.util.concurrent.CompletableFuture; + +/** Testing RestClient implementation. */ +public class TestingRestClient extends RestClient { + + @Setter + private TriFunction< + MessageHeaders<?, ?, ?>, + MessageParameters, + RequestBody, + CompletableFuture<ResponseBody>> + requestProcessor = + (ignore1, ignore2, ignore) -> + CompletableFuture.completedFuture(EmptyResponseBody.getInstance()); + + public TestingRestClient(Configuration conf) throws ConfigurationException { + super(conf, Executors.directExecutor()); + } + + @Override + public < + M extends MessageHeaders<R, P, U>, + U extends MessageParameters, + R extends RequestBody, + P extends ResponseBody> + CompletableFuture<P> sendRequest( + String targetAddress, + int targetPort, + M messageHeaders, + U messageParameters, + R request, + Collection<FileUpload> fileUploads, + RestAPIVersion<? extends RestAPIVersion<?>> apiVersion) { + return (CompletableFuture<P>) + requestProcessor.apply(messageHeaders, messageParameters, request); + } +} diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java new file mode 100644 index 00000000..c469679d --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java @@ -0,0 +1,873 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.service; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.operator.TestUtils; +import org.apache.flink.kubernetes.operator.TestingClusterClient; +import org.apache.flink.kubernetes.operator.TestingRestClient; +import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion; +import org.apache.flink.kubernetes.operator.api.spec.JobSpec; +import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode; +import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; +import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; +import org.apache.flink.kubernetes.operator.api.status.JobStatus; +import org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType; +import org.apache.flink.kubernetes.operator.artifact.ArtifactManager; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; +import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; +import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; +import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; +import org.apache.flink.kubernetes.operator.exception.RecoveryFailureException; +import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult; +import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.instance.HardwareDescription; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult; +import org.apache.flink.runtime.rest.handler.async.TriggerResponse; +import org.apache.flink.runtime.rest.messages.DashboardConfiguration; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.TriggerId; +import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsMessageParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.Metric; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration; +import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHeaders; +import org.apache.flink.runtime.webmonitor.handlers.JarRunRequestBody; +import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders; +import org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody; +import org.apache.flink.util.ConfigurationException; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.SerializedThrowable; +import org.apache.flink.util.concurrent.Executors; +import org.apache.flink.util.function.TriFunction; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import io.fabric8.kubernetes.api.model.DeletionPropagation; +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.api.model.PodList; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.File; +import java.io.IOException; +import java.net.ServerSocket; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION; +import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +/** @link FlinkService unit tests */ +@EnableKubernetesMockClient(crud = true) +public class AbstractFlinkServiceTest { + + @TempDir Path tempDir; + File testJar; + + private KubernetesClient client; + private final Configuration configuration = new Configuration(); + + private final FlinkConfigManager configManager = new FlinkConfigManager(configuration); + private FlinkOperatorConfiguration operatorConfig; + private ExecutorService executorService; + + private ArtifactManager artifactManager; + + @BeforeEach + public void setup() { + configuration.set(KubernetesConfigOptions.CLUSTER_ID, TestUtils.TEST_DEPLOYMENT_NAME); + configuration.set(KubernetesConfigOptions.NAMESPACE, TestUtils.TEST_NAMESPACE); + configuration.set(FLINK_VERSION, FlinkVersion.v1_18); + operatorConfig = FlinkOperatorConfiguration.fromConfiguration(configuration); + executorService = Executors.newDirectExecutorService(); + testJar = tempDir.resolve("test.jar").toFile(); + artifactManager = + new ArtifactManager(configManager) { + @Override + public File fetch( + String jarURI, Configuration flinkConfiguration, String targetDirStr) + throws IOException { + Files.writeString(testJar.toPath(), "test"); + return testJar; + } + }; + } + + @ParameterizedTest + @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions") + public void sessionJobSubmissionTest(FlinkVersion flinkVersion) throws Exception { + var jarRuns = new ArrayList<JarRunRequestBody>(); + var flinkService = + getTestingService( + (h, p, b) -> { + if (b instanceof JarRunRequestBody) { + jarRuns.add((JarRunRequestBody) b); + return CompletableFuture.completedFuture(null); + } else if (h instanceof JarUploadHeaders) { + return CompletableFuture.completedFuture( + new JarUploadResponseBody("test")); + } else if (h instanceof JarDeleteHeaders) { + return CompletableFuture.completedFuture(null); + } + + throw new UnsupportedOperationException("Unknown request"); + }); + var session = TestUtils.buildSessionCluster(flinkVersion); + session.getStatus() + .getReconciliationStatus() + .serializeAndSetLastReconciledSpec(session.getSpec(), session); + + var job = TestUtils.buildSessionJob(); + var deployConf = configManager.getSessionJobConfig(session, job.getSpec()); + flinkService.submitJobToSessionCluster(job.getMetadata(), job.getSpec(), deployConf, null); + + // Make sure that deploy conf was passed to jar run + if (flinkVersion.isNewerVersionThan(FlinkVersion.v1_16)) { + assertEquals(deployConf.toMap(), jarRuns.get(0).getFlinkConfiguration().toMap()); + } else { + assertTrue(jarRuns.get(0).getFlinkConfiguration().toMap().isEmpty()); + } + } + + @Test + public void jarRunErrorHandlingTest() throws Exception { + List<JarRunRequestBody> jarRuns = new ArrayList<>(); + AtomicBoolean deleted = new AtomicBoolean(false); + var flinkService = + getTestingService( + (h, p, b) -> { + if (b instanceof JarRunRequestBody) { + jarRuns.add((JarRunRequestBody) b); + return CompletableFuture.failedFuture( + new Exception("RunException")); + } else if (h instanceof JarDeleteHeaders) { + deleted.set(true); + return CompletableFuture.failedFuture( + new Exception("DeleteException")); + } + + fail(); + return null; + }); + + var job = TestUtils.buildSessionJob(); + var jobId = new JobID(); + + assertThrows( + FlinkRuntimeException.class, + () -> + flinkService.runJar( + job.getSpec().getJob(), + jobId, + new JarUploadResponseBody("test"), + configuration, + null)); + assertEquals(jobId, jarRuns.get(0).getJobId()); + assertTrue(deleted.get()); + } + + private TestingService getTestingService( + TriFunction< + MessageHeaders<?, ?, ?>, + MessageParameters, + RequestBody, + CompletableFuture<ResponseBody>> + requestProcessor) + throws Exception { + var testingClusterClient = new TestingClusterClient<String>(configuration); + testingClusterClient.setRequestProcessor(requestProcessor); + var testingRestClient = new TestingRestClient(configuration); + testingRestClient.setRequestProcessor(requestProcessor); + return new TestingService(testingClusterClient, testingRestClient); + } + + @Test + public void cancelJobWithStatelessUpgradeModeTest() throws Exception { + final TestingClusterClient<String> testingClusterClient = + new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME); + final CompletableFuture<JobID> cancelFuture = new CompletableFuture<>(); + testingClusterClient.setCancelFunction( + jobID -> { + cancelFuture.complete(jobID); + return CompletableFuture.completedFuture(Acknowledge.get()); + }); + + var flinkService = new TestingService(testingClusterClient); + + JobID jobID = JobID.generate(); + FlinkDeployment deployment = TestUtils.buildApplicationCluster(); + JobStatus jobStatus = deployment.getStatus().getJobStatus(); + jobStatus.setJobId(jobID.toHexString()); + ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration()); + + deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY); + deployment.getStatus().getJobStatus().setState("RUNNING"); + flinkService.cancelJob( + deployment, + UpgradeMode.STATELESS, + configManager.getObserveConfig(deployment), + false); + assertTrue(cancelFuture.isDone()); + assertEquals(jobID, cancelFuture.get()); + assertNull(jobStatus.getSavepointInfo().getLastSavepoint()); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void cancelJobWithSavepointUpgradeModeTest(boolean deleteAfterSavepoint) + throws Exception { + var testingClusterClient = + new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME); + CompletableFuture<Tuple3<JobID, Boolean, String>> stopWithSavepointFuture = + new CompletableFuture<>(); + var savepointPath = "file:///path/of/svp-1"; + configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointPath); + testingClusterClient.setStopWithSavepointFunction( + (jobID, advanceToEndOfEventTime, savepointDir) -> { + stopWithSavepointFuture.complete( + new Tuple3<>(jobID, advanceToEndOfEventTime, savepointDir)); + return CompletableFuture.completedFuture(savepointPath); + }); + + var flinkService = new TestingService(testingClusterClient); + + JobID jobID = JobID.generate(); + FlinkDeployment deployment = TestUtils.buildApplicationCluster(); + deployment + .getSpec() + .getFlinkConfiguration() + .put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), savepointPath); + deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY); + JobStatus jobStatus = deployment.getStatus().getJobStatus(); + jobStatus.setJobId(jobID.toHexString()); + jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name()); + ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration()); + + flinkService.cancelJob( + deployment, + UpgradeMode.SAVEPOINT, + configManager.getObserveConfig(deployment), + deleteAfterSavepoint); + assertTrue(stopWithSavepointFuture.isDone()); + assertEquals(jobID, stopWithSavepointFuture.get().f0); + assertFalse(stopWithSavepointFuture.get().f1); + assertEquals(savepointPath, stopWithSavepointFuture.get().f2); + assertEquals(savepointPath, jobStatus.getSavepointInfo().getLastSavepoint().getLocation()); + + assertEquals(jobStatus.getState(), org.apache.flink.api.common.JobStatus.FINISHED.name()); + assertEquals( + deployment.getStatus().getJobManagerDeploymentStatus(), + deleteAfterSavepoint + ? JobManagerDeploymentStatus.MISSING + : JobManagerDeploymentStatus.READY); + if (deleteAfterSavepoint) { + assertEquals(List.of(deployment.getMetadata()), flinkService.deleted); + } else { + assertTrue(flinkService.deleted.isEmpty()); + } + } + + @Test + public void cancelJobWithLastStateUpgradeModeTest() throws Exception { + var deployment = TestUtils.buildApplicationCluster(); + ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration()); + var testingClusterClient = + new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME); + var flinkService = new TestingService(testingClusterClient); + + JobID jobID = JobID.generate(); + JobStatus jobStatus = deployment.getStatus().getJobStatus(); + jobStatus.setJobId(jobID.toHexString()); + + flinkService.cancelJob( + deployment, + UpgradeMode.LAST_STATE, + configManager.getObserveConfig(deployment), + false); + assertNull(jobStatus.getSavepointInfo().getLastSavepoint()); + } + + @Test + public void deletionPropagationTest() { + var propagation = new ArrayList<DeletionPropagation>(); + TestingService flinkService = + new TestingService(null) { + @Override + protected void deleteClusterInternal( + ObjectMeta meta, + Configuration conf, + boolean deleteHaData, + DeletionPropagation deletionPropagation) { + propagation.add(deletionPropagation); + } + }; + + flinkService.deleteClusterDeployment( + new ObjectMeta(), new FlinkDeploymentStatus(), configuration, true); + assertEquals(DeletionPropagation.FOREGROUND, propagation.get(0)); + + configuration.set( + KubernetesOperatorConfigOptions.RESOURCE_DELETION_PROPAGATION, + DeletionPropagation.BACKGROUND); + operatorConfig = FlinkOperatorConfiguration.fromConfiguration(configuration); + + flinkService = + new TestingService(null) { + @Override + protected void deleteClusterInternal( + ObjectMeta meta, + Configuration conf, + boolean deleteHaData, + DeletionPropagation deletionPropagation) { + propagation.add(deletionPropagation); + } + }; + flinkService.deleteClusterDeployment( + new ObjectMeta(), new FlinkDeploymentStatus(), configuration, true); + assertEquals(DeletionPropagation.BACKGROUND, propagation.get(1)); + } + + @Test + public void triggerSavepointTest() throws Exception { + CompletableFuture<Tuple3<JobID, String, Boolean>> triggerSavepointFuture = + new CompletableFuture<>(); + String savepointPath = "file:///path/of/svp"; + configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointPath); + var flinkService = + getTestingService( + (headers, parameters, requestBody) -> { + triggerSavepointFuture.complete( + new Tuple3<>( + ((SavepointTriggerMessageParameters) parameters) + .jobID.getValue(), + ((SavepointTriggerRequestBody) requestBody) + .getTargetDirectory() + .get(), + ((SavepointTriggerRequestBody) requestBody) + .isCancelJob())); + return CompletableFuture.completedFuture( + new TriggerResponse(new TriggerId())); + }); + + var jobID = JobID.generate(); + var flinkDeployment = TestUtils.buildApplicationCluster(); + ReconciliationUtils.updateStatusForDeployedSpec(flinkDeployment, new Configuration()); + JobStatus jobStatus = new JobStatus(); + jobStatus.setJobId(jobID.toString()); + flinkDeployment.getStatus().setJobStatus(jobStatus); + flinkService.triggerSavepoint( + flinkDeployment.getStatus().getJobStatus().getJobId(), + SavepointTriggerType.MANUAL, + flinkDeployment.getStatus().getJobStatus().getSavepointInfo(), + configuration); + assertTrue(triggerSavepointFuture.isDone()); + assertEquals(jobID, triggerSavepointFuture.get().f0); + assertEquals(savepointPath, triggerSavepointFuture.get().f1); + assertFalse(triggerSavepointFuture.get().f2); + } + + @Test + public void disposeSavepointTest() throws Exception { + var savepointPath = "file:///path/of/svp"; + var tested = new AtomicBoolean(false); + configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointPath); + var flinkService = + getTestingService( + (h, p, r) -> { + if (r instanceof SavepointDisposalRequest) { + var dr = (SavepointDisposalRequest) r; + assertEquals(savepointPath, dr.getSavepointPath()); + tested.set(true); + return CompletableFuture.completedFuture(null); + } + fail("unknown request"); + return null; + }); + flinkService.disposeSavepoint(savepointPath, configuration); + assertTrue(tested.get()); + } + + @Test + public void nativeSavepointFormatTest() throws Exception { + final TestingClusterClient<String> testingClusterClient = + new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME); + final String savepointPath = "file:///path/of/svp"; + final CompletableFuture<Tuple4<JobID, String, Boolean, SavepointFormatType>> + triggerSavepointFuture = new CompletableFuture<>(); + configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointPath); + testingClusterClient.setRequestProcessor( + (headers, parameters, requestBody) -> { + triggerSavepointFuture.complete( + new Tuple4<>( + ((SavepointTriggerMessageParameters) parameters) + .jobID.getValue(), + ((SavepointTriggerRequestBody) requestBody) + .getTargetDirectory() + .get(), + ((SavepointTriggerRequestBody) requestBody).isCancelJob(), + ((SavepointTriggerRequestBody) requestBody).getFormatType())); + return CompletableFuture.completedFuture(new TriggerResponse(new TriggerId())); + }); + final CompletableFuture<Tuple3<JobID, SavepointFormatType, String>> + stopWithSavepointFuture = new CompletableFuture<>(); + testingClusterClient.setStopWithSavepointFormat( + (id, formatType, savepointDir) -> { + stopWithSavepointFuture.complete(new Tuple3<>(id, formatType, savepointDir)); + return CompletableFuture.completedFuture(savepointPath); + }); + + var flinkService = new TestingService(testingClusterClient); + + final JobID jobID = JobID.generate(); + final FlinkDeployment deployment = TestUtils.buildApplicationCluster(); + deployment + .getSpec() + .getFlinkConfiguration() + .put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), savepointPath); + deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY); + JobStatus jobStatus = deployment.getStatus().getJobStatus(); + jobStatus.setJobId(jobID.toHexString()); + jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name()); + ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration()); + + jobStatus.setJobId(jobID.toString()); + deployment.getStatus().setJobStatus(jobStatus); + flinkService.triggerSavepoint( + deployment.getStatus().getJobStatus().getJobId(), + SavepointTriggerType.MANUAL, + deployment.getStatus().getJobStatus().getSavepointInfo(), + new Configuration(configuration) + .set(OPERATOR_SAVEPOINT_FORMAT_TYPE, SavepointFormatType.NATIVE)); + assertTrue(triggerSavepointFuture.isDone()); + assertEquals(jobID, triggerSavepointFuture.get().f0); + assertEquals(savepointPath, triggerSavepointFuture.get().f1); + assertFalse(triggerSavepointFuture.get().f2); + assertEquals(SavepointFormatType.NATIVE, triggerSavepointFuture.get().f3); + + flinkService.cancelJob( + deployment, + UpgradeMode.SAVEPOINT, + new Configuration(configManager.getObserveConfig(deployment)) + .set(OPERATOR_SAVEPOINT_FORMAT_TYPE, SavepointFormatType.NATIVE), + false); + assertTrue(stopWithSavepointFuture.isDone()); + assertEquals(jobID, stopWithSavepointFuture.get().f0); + assertEquals(SavepointFormatType.NATIVE, stopWithSavepointFuture.get().f1); + assertEquals(savepointPath, stopWithSavepointFuture.get().f2); + } + + @Test + public void getLastCheckpointTest() throws Exception { + ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); + + var responseContainer = new ArrayList<CheckpointHistoryWrapper>(); + var flinkService = + getTestingService( + (headers, parameters, requestBody) -> { + if (headers instanceof CustomCheckpointingStatisticsHeaders) { + return CompletableFuture.completedFuture(responseContainer.get(0)); + } + fail("unknown request"); + return null; + }); + + String responseWithHistory = + "{\"counts\":{\"restored\":1,\"total\":79,\"in_progress\":0,\"completed\":69,\"failed\":10},\"summary\":{\"checkpointed_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"state_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"end_to_end_duration\":{\"min\":14,\"max\":117,\"avg\":24,\"p50\":22,\"p90\":32,\"p95\":40.5,\"p99\":11 [...] + String responseWithoutHistory = + "{\"counts\":{\"restored\":1,\"total\":79,\"in_progress\":0,\"completed\":69,\"failed\":10},\"summary\":{\"checkpointed_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"state_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"end_to_end_duration\":{\"min\":14,\"max\":117,\"avg\":24,\"p50\":22,\"p90\":32,\"p95\":40.5,\"p99\":11 [...] + String responseWithoutHistoryInternal = + "{\"counts\":{\"restored\":1,\"total\":79,\"in_progress\":0,\"completed\":69,\"failed\":10},\"summary\":{\"checkpointed_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"state_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"end_to_end_duration\":{\"min\":14,\"max\":117,\"avg\":24,\"p50\":22,\"p90\":32,\"p95\":40.5,\"p99\":11 [...] + + responseContainer.add( + objectMapper.readValue(responseWithHistory, CheckpointHistoryWrapper.class)); + var checkpointOpt = flinkService.getLastCheckpoint(new JobID(), new Configuration()); + assertEquals( + "file:/flink-data/checkpoints/00000000000000000000000000000000/chk-96", + checkpointOpt.get().getLocation()); + + responseContainer.set( + 0, objectMapper.readValue(responseWithoutHistory, CheckpointHistoryWrapper.class)); + checkpointOpt = flinkService.getLastCheckpoint(new JobID(), new Configuration()); + assertEquals( + "file:/flink-data/savepoints/savepoint-000000-5930e5326ca7", + checkpointOpt.get().getLocation()); + + responseContainer.set( + 0, + objectMapper.readValue( + responseWithoutHistoryInternal, CheckpointHistoryWrapper.class)); + try { + flinkService.getLastCheckpoint(new JobID(), new Configuration()); + fail(); + } catch (RecoveryFailureException dpe) { + + } + } + + @Test + public void fetchSavepointInfoTest() throws Exception { + var triggerId = new TriggerId(); + var jobId = new JobID(); + var response = new AtomicReference<AsynchronousOperationResult<SavepointInfo>>(); + var flinkService = + getTestingService( + (h, p, r) -> { + if (p instanceof SavepointStatusMessageParameters) { + var params = (SavepointStatusMessageParameters) p; + assertEquals(jobId, params.jobIdPathParameter.getValue()); + assertEquals(triggerId, params.triggerIdPathParameter.getValue()); + if (response.get() == null) { + return CompletableFuture.failedFuture(new Exception("fail")); + } + return CompletableFuture.completedFuture(response.get()); + } + fail("unknown request"); + return null; + }); + + response.set(AsynchronousOperationResult.completed(new SavepointInfo("l", null))); + assertEquals( + SavepointFetchResult.completed("l"), + flinkService.fetchSavepointInfo( + triggerId.toString(), jobId.toString(), configuration)); + + response.set(AsynchronousOperationResult.inProgress()); + assertEquals( + SavepointFetchResult.pending(), + flinkService.fetchSavepointInfo( + triggerId.toString(), jobId.toString(), configuration)); + + response.set( + AsynchronousOperationResult.completed( + new SavepointInfo( + null, new SerializedThrowable(new Exception("testErr"))))); + assertTrue( + flinkService + .fetchSavepointInfo(triggerId.toString(), jobId.toString(), configuration) + .getError() + .contains("testErr")); + + response.set(null); + assertTrue( + flinkService + .fetchSavepointInfo(triggerId.toString(), jobId.toString(), configuration) + .getError() + .contains("fail")); + } + + @Test + public void removeOperatorConfigTest() { + var key = "kubernetes.operator.meyKey"; + var deployConfig = Configuration.fromMap(Map.of("kubernetes.operator.meyKey", "v")); + var newConf = AbstractFlinkService.removeOperatorConfigs(deployConfig); + assertFalse(newConf.containsKey(key)); + } + + @Test + public void getMetricsTest() throws Exception { + var jobId = new JobID(); + var metricNames = List.of("m1", "m2"); + var flinkService = + getTestingService( + (h, p, r) -> { + if (p instanceof JobMetricsMessageParameters) { + var jmmp = ((JobMetricsMessageParameters) p); + assertEquals(jobId, jmmp.jobPathParameter.getValue()); + var output = + jmmp.metricsFilterParameter.getValue().stream() + .map(s -> new Metric(s, s)) + .collect(Collectors.toList()); + return CompletableFuture.completedFuture( + new MetricCollectionResponseBody(output)); + } + fail("unknown request"); + return null; + }); + assertEquals( + Map.of("m1", "m1", "m2", "m2"), + flinkService.getMetrics(configuration, jobId.toHexString(), metricNames)); + } + + @Test + public void getClusterInfoTest() throws Exception { + var config = new CustomDashboardConfiguration(); + var testVersion = "testVersion"; + var testRevision = "testRevision"; + config.setFlinkVersion(testVersion); + config.setFlinkRevision(testRevision); + + var tmInfo = + new TaskManagerInfo( + ResourceID.generate(), + "", + 0, + 0, + 0L, + 0, + 0, + ResourceProfile.UNKNOWN, + ResourceProfile.UNKNOWN, + new HardwareDescription(1, 0L, 0L, 0L), + new TaskExecutorMemoryConfiguration(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L), + null); + var tmsInfo = new TaskManagersInfo(List.of(tmInfo)); + + var flinkService = + getTestingService( + (h, p, r) -> { + if (h instanceof CustomDashboardConfigurationHeaders) { + return CompletableFuture.completedFuture(config); + } else if (h instanceof TaskManagersHeaders) { + return CompletableFuture.completedFuture(tmsInfo); + } + fail("unknown request"); + return null; + }); + + var conf = new Configuration(); + conf.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(1000)); + conf.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(1000)); + + assertEquals( + Map.of( + DashboardConfiguration.FIELD_NAME_FLINK_VERSION, + testVersion, + DashboardConfiguration.FIELD_NAME_FLINK_REVISION, + testRevision, + AbstractFlinkService.FIELD_NAME_TOTAL_CPU, + "2.0", + AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, + "" + MemorySize.ofMebiBytes(1000).getBytes() * 2), + flinkService.getClusterInfo(conf)); + } + + @Test + public void effectiveStatusTest() { + JobDetails allRunning = + getJobDetails( + org.apache.flink.api.common.JobStatus.RUNNING, + Tuple2.of(ExecutionState.RUNNING, 4)); + assertEquals( + org.apache.flink.api.common.JobStatus.RUNNING, + AbstractFlinkService.getEffectiveStatus(allRunning)); + + JobDetails allRunningOrFinished = + getJobDetails( + org.apache.flink.api.common.JobStatus.RUNNING, + Tuple2.of(ExecutionState.RUNNING, 2), + Tuple2.of(ExecutionState.FINISHED, 2)); + assertEquals( + org.apache.flink.api.common.JobStatus.RUNNING, + AbstractFlinkService.getEffectiveStatus(allRunningOrFinished)); + + JobDetails allRunningOrScheduled = + getJobDetails( + org.apache.flink.api.common.JobStatus.RUNNING, + Tuple2.of(ExecutionState.RUNNING, 2), + Tuple2.of(ExecutionState.SCHEDULED, 2)); + assertEquals( + org.apache.flink.api.common.JobStatus.CREATED, + AbstractFlinkService.getEffectiveStatus(allRunningOrScheduled)); + + JobDetails allFinished = + getJobDetails( + org.apache.flink.api.common.JobStatus.FINISHED, + Tuple2.of(ExecutionState.FINISHED, 4)); + assertEquals( + org.apache.flink.api.common.JobStatus.FINISHED, + AbstractFlinkService.getEffectiveStatus(allFinished)); + } + + private JobDetails getJobDetails( + org.apache.flink.api.common.JobStatus status, + Tuple2<ExecutionState, Integer>... tasksPerState) { + int[] countPerState = new int[ExecutionState.values().length]; + for (var taskPerState : tasksPerState) { + countPerState[taskPerState.f0.ordinal()] = taskPerState.f1; + } + int numTasks = Arrays.stream(countPerState).sum(); + return new JobDetails( + new JobID(), + "test-job", + System.currentTimeMillis(), + -1, + 0, + status, + System.currentTimeMillis(), + countPerState, + numTasks); + } + + @Test + public void isJobManagerReadyTest() throws Exception { + AtomicReference<String> url = new AtomicReference<>(); + var clusterClient = + new TestingClusterClient<String>(configuration) { + @Override + public String getWebInterfaceURL() { + return url.get(); + } + }; + var flinkService = new TestingService(clusterClient); + + assertThrows( + FlinkRuntimeException.class, + () -> flinkService.isJobManagerPortReady(configuration)); + + int port = 6868; + url.set("http://127.0.0.1:" + port); + + assertFalse(flinkService.isJobManagerPortReady(configuration)); + try (var socket = new ServerSocket(port)) { + assertTrue(flinkService.isJobManagerPortReady(configuration)); + } + } + + class TestingService extends AbstractFlinkService { + + RestClusterClient<String> clusterClient; + RestClient restClient; + List<ObjectMeta> deleted = new ArrayList<>(); + + Map<Tuple2<String, String>, PodList> jmPods = new HashMap<>(); + Map<Tuple2<String, String>, PodList> tmPods = new HashMap<>(); + + TestingService(RestClusterClient<String> clusterClient) { + this(clusterClient, null); + } + + TestingService(RestClusterClient<String> clusterClient, RestClient restClient) { + super( + client, + AbstractFlinkServiceTest.this.artifactManager, + AbstractFlinkServiceTest.this.executorService, + AbstractFlinkServiceTest.this.operatorConfig); + this.clusterClient = clusterClient; + this.restClient = restClient; + } + + @Override + public RestClusterClient<String> getClusterClient(Configuration config) { + return clusterClient; + } + + @Override + protected RestClient getRestClient(Configuration conf) throws ConfigurationException { + return restClient; + } + + @Override + protected PodList getJmPodList(String namespace, String clusterId) { + return jmPods.getOrDefault(Tuple2.of(namespace, clusterId), new PodList()); + } + + @Override + protected PodList getTmPodList(String namespace, String clusterId) { + return tmPods.getOrDefault(Tuple2.of(namespace, clusterId), new PodList()); + } + + @Override + protected void deployApplicationCluster(JobSpec jobSpec, Configuration conf) { + throw new UnsupportedOperationException(); + } + + @Override + public void submitSessionCluster(Configuration conf) { + throw new UnsupportedOperationException(); + } + + @Override + public void cancelJob( + FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration conf) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean scale(FlinkResourceContext<?> resourceContext) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean scalingCompleted(FlinkResourceContext<?> resourceContext) { + throw new UnsupportedOperationException(); + } + + @Override + protected void deleteClusterInternal( + ObjectMeta meta, + Configuration conf, + boolean deleteHaData, + DeletionPropagation deletionPropagation) { + deleted.add(meta); + } + } +} diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java index bb78814c..fe2d1944 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java @@ -18,62 +18,41 @@ package org.apache.flink.kubernetes.operator.service; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.client.program.rest.RestClusterClient; -import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.PipelineOptions; -import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.TestingClusterClient; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; -import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec; import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.api.spec.JobSpec; import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode; -import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; -import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; -import org.apache.flink.kubernetes.operator.api.status.JobStatus; -import org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentContext; -import org.apache.flink.kubernetes.operator.exception.RecoveryFailureException; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.utils.EventCollector; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.jobgraph.JobResourceRequirements; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements; -import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.messages.webmonitor.JobDetails; -import org.apache.flink.runtime.rest.handler.async.TriggerResponse; import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; import org.apache.flink.runtime.rest.messages.JobPlanInfo; -import org.apache.flink.runtime.rest.messages.TriggerId; import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders; import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; +import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody; +import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders; import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo; -import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters; -import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody; -import org.apache.flink.runtime.rest.util.RestMapperUtils; -import org.apache.flink.runtime.webmonitor.handlers.JarRunRequestBody; -import org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody; import org.apache.flink.util.concurrent.Executors; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; - import io.fabric8.kubernetes.api.model.DeletionPropagation; -import io.fabric8.kubernetes.api.model.ObjectMeta; -import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; @@ -83,18 +62,16 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION; import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT; -import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -126,93 +103,21 @@ public class NativeFlinkServiceTest { } @Test - public void testCancelJobWithStatelessUpgradeMode() throws Exception { - final TestingClusterClient<String> testingClusterClient = - new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME); - final CompletableFuture<JobID> cancelFuture = new CompletableFuture<>(); - testingClusterClient.setCancelFunction( - jobID -> { - cancelFuture.complete(jobID); - return CompletableFuture.completedFuture(Acknowledge.get()); - }); - - final FlinkService flinkService = createFlinkService(testingClusterClient); - - JobID jobID = JobID.generate(); - FlinkDeployment deployment = TestUtils.buildApplicationCluster(); - JobStatus jobStatus = deployment.getStatus().getJobStatus(); - jobStatus.setJobId(jobID.toHexString()); - ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration()); - - deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY); - deployment.getStatus().getJobStatus().setState("RUNNING"); - flinkService.cancelJob( - deployment, UpgradeMode.STATELESS, configManager.getObserveConfig(deployment)); - assertTrue(cancelFuture.isDone()); - assertEquals(jobID, cancelFuture.get()); - assertNull(jobStatus.getSavepointInfo().getLastSavepoint()); - } - - @ParameterizedTest - @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions") - public void testCancelJobWithSavepointUpgradeMode(FlinkVersion flinkVersion) throws Exception { - final TestingClusterClient<String> testingClusterClient = - new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME); - final CompletableFuture<Tuple3<JobID, Boolean, String>> stopWithSavepointFuture = - new CompletableFuture<>(); - final String savepointPath = "file:///path/of/svp-1"; - configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointPath); - testingClusterClient.setStopWithSavepointFunction( - (jobID, advanceToEndOfEventTime, savepointDir) -> { - stopWithSavepointFuture.complete( - new Tuple3<>(jobID, advanceToEndOfEventTime, savepointDir)); - return CompletableFuture.completedFuture(savepointPath); - }); - - final FlinkService flinkService = createFlinkService(testingClusterClient); - - JobID jobID = JobID.generate(); - FlinkDeployment deployment = TestUtils.buildApplicationCluster(); - deployment - .getSpec() - .getFlinkConfiguration() - .put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), savepointPath); - deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY); - JobStatus jobStatus = deployment.getStatus().getJobStatus(); - jobStatus.setJobId(jobID.toHexString()); - jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name()); - ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration()); - - deployment.getSpec().setFlinkVersion(flinkVersion); - flinkService.cancelJob( - deployment, UpgradeMode.SAVEPOINT, configManager.getObserveConfig(deployment)); - assertTrue(stopWithSavepointFuture.isDone()); - assertEquals(jobID, stopWithSavepointFuture.get().f0); - assertFalse(stopWithSavepointFuture.get().f1); - assertEquals(savepointPath, stopWithSavepointFuture.get().f2); - assertEquals(savepointPath, jobStatus.getSavepointInfo().getLastSavepoint().getLocation()); - - assertEquals(jobStatus.getState(), org.apache.flink.api.common.JobStatus.FINISHED.name()); - if (flinkVersion.isNewerVersionThan(FlinkVersion.v1_14)) { - assertEquals( - deployment.getStatus().getJobManagerDeploymentStatus(), - JobManagerDeploymentStatus.READY); - } else { - assertEquals( - deployment.getStatus().getJobManagerDeploymentStatus(), - JobManagerDeploymentStatus.MISSING); - } - } - - @Test - public void testCancelJobWithLastStateUpgradeMode() throws Exception { - FlinkDeployment deployment = TestUtils.buildApplicationCluster(); + public void testDeleteClusterInternal() { + var deployment = TestUtils.buildApplicationCluster(); ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration()); - final TestingClusterClient<String> testingClusterClient = - new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME); - final FlinkService flinkService = createFlinkService(testingClusterClient); + var flinkService = createFlinkService(null); - client.resource(createTestingDeployment()).create(); + var dep = + new DeploymentBuilder() + .withNewMetadata() + .withName(TestUtils.TEST_DEPLOYMENT_NAME) + .withNamespace(TestUtils.TEST_NAMESPACE) + .endMetadata() + .withNewSpec() + .endSpec() + .build(); + client.resource(dep).create(); assertNotNull( client.apps() @@ -221,13 +126,11 @@ public class NativeFlinkServiceTest { .withName(TestUtils.TEST_DEPLOYMENT_NAME) .get()); - JobID jobID = JobID.generate(); - JobStatus jobStatus = deployment.getStatus().getJobStatus(); - jobStatus.setJobId(jobID.toHexString()); - - flinkService.cancelJob( - deployment, UpgradeMode.LAST_STATE, configManager.getObserveConfig(deployment)); - assertNull(jobStatus.getSavepointInfo().getLastSavepoint()); + flinkService.deleteClusterInternal( + deployment.getMetadata(), + configManager.getObserveConfig(deployment), + false, + DeletionPropagation.FOREGROUND); assertNull( client.apps() .deployments() @@ -236,151 +139,31 @@ public class NativeFlinkServiceTest { .get()); } - @Test - public void testTriggerSavepoint() throws Exception { - final TestingClusterClient<String> testingClusterClient = - new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME); - final CompletableFuture<Tuple3<JobID, String, Boolean>> triggerSavepointFuture = - new CompletableFuture<>(); - final String savepointPath = "file:///path/of/svp"; - configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointPath); - testingClusterClient.setRequestProcessor( - (headers, parameters, requestBody) -> { - triggerSavepointFuture.complete( - new Tuple3<>( - ((SavepointTriggerMessageParameters) parameters) - .jobID.getValue(), - ((SavepointTriggerRequestBody) requestBody) - .getTargetDirectory() - .get(), - ((SavepointTriggerRequestBody) requestBody).isCancelJob())); - return CompletableFuture.completedFuture(new TriggerResponse(new TriggerId())); - }); - - final FlinkService flinkService = createFlinkService(testingClusterClient); - - final JobID jobID = JobID.generate(); - final FlinkDeployment flinkDeployment = TestUtils.buildApplicationCluster(); - ReconciliationUtils.updateStatusForDeployedSpec(flinkDeployment, new Configuration()); - JobStatus jobStatus = new JobStatus(); - jobStatus.setJobId(jobID.toString()); - flinkDeployment.getStatus().setJobStatus(jobStatus); - flinkService.triggerSavepoint( - flinkDeployment.getStatus().getJobStatus().getJobId(), - SavepointTriggerType.MANUAL, - flinkDeployment.getStatus().getJobStatus().getSavepointInfo(), - configuration); - assertTrue(triggerSavepointFuture.isDone()); - assertEquals(jobID, triggerSavepointFuture.get().f0); - assertEquals(savepointPath, triggerSavepointFuture.get().f1); - assertFalse(triggerSavepointFuture.get().f2); - } - - @Test - public void testGetLastCheckpoint() throws Exception { - ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); - var testingClusterClient = - new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME); - - String responseWithHistory = - "{\"counts\":{\"restored\":1,\"total\":79,\"in_progress\":0,\"completed\":69,\"failed\":10},\"summary\":{\"checkpointed_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"state_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"end_to_end_duration\":{\"min\":14,\"max\":117,\"avg\":24,\"p50\":22,\"p90\":32,\"p95\":40.5,\"p99\":11 [...] - String responseWithoutHistory = - "{\"counts\":{\"restored\":1,\"total\":79,\"in_progress\":0,\"completed\":69,\"failed\":10},\"summary\":{\"checkpointed_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"state_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"end_to_end_duration\":{\"min\":14,\"max\":117,\"avg\":24,\"p50\":22,\"p90\":32,\"p95\":40.5,\"p99\":11 [...] - String responseWithoutHistoryInternal = - "{\"counts\":{\"restored\":1,\"total\":79,\"in_progress\":0,\"completed\":69,\"failed\":10},\"summary\":{\"checkpointed_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"state_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"end_to_end_duration\":{\"min\":14,\"max\":117,\"avg\":24,\"p50\":22,\"p90\":32,\"p95\":40.5,\"p99\":11 [...] - - var responseContainer = new ArrayList<CheckpointHistoryWrapper>(); - - testingClusterClient.setRequestProcessor( - (headers, parameters, requestBody) -> { - if (headers instanceof CustomCheckpointingStatisticsHeaders) { - return CompletableFuture.completedFuture(responseContainer.get(0)); + @ParameterizedTest + @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions") + public void testDeleteOnSavepointBefore1_15(FlinkVersion flinkVersion) throws Exception { + AtomicBoolean tested = new AtomicBoolean(false); + var flinkService = + new NativeFlinkService( + client, null, executorService, operatorConfig, eventRecorder) { + @Override + protected void cancelJob( + FlinkDeployment deployment, + UpgradeMode upgradeMode, + Configuration conf, + boolean deleteClusterAfterSavepoint) { + assertEquals( + flinkVersion.isNewerVersionThan(FlinkVersion.v1_14) ? false : true, + deleteClusterAfterSavepoint); + tested.set(true); } - fail("unknown request"); - return null; - }); - - var flinkService = createFlinkService(testingClusterClient); - - responseContainer.add( - objectMapper.readValue(responseWithHistory, CheckpointHistoryWrapper.class)); - var checkpointOpt = flinkService.getLastCheckpoint(new JobID(), new Configuration()); - assertEquals( - "file:/flink-data/checkpoints/00000000000000000000000000000000/chk-96", - checkpointOpt.get().getLocation()); - - responseContainer.set( - 0, objectMapper.readValue(responseWithoutHistory, CheckpointHistoryWrapper.class)); - checkpointOpt = flinkService.getLastCheckpoint(new JobID(), new Configuration()); - assertEquals( - "file:/flink-data/savepoints/savepoint-000000-5930e5326ca7", - checkpointOpt.get().getLocation()); - - responseContainer.set( - 0, - objectMapper.readValue( - responseWithoutHistoryInternal, CheckpointHistoryWrapper.class)); - try { - flinkService.getLastCheckpoint(new JobID(), new Configuration()); - fail(); - } catch (RecoveryFailureException dpe) { - - } - } - - @Test - public void testGetLastSavepointRestCompatibility() throws JsonProcessingException { - ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); - String flink14Response = - "{\"counts\":{\"restored\":0,\"total\":2,\"in_progress\":0,\"completed\":2,\"failed\":0},\"summary\":{\"state_size\":{\"min\":8646,\"max\":25626,\"avg\":17136},\"end_to_end_duration\":{\"min\":95,\"max\":420,\"avg\":257},\"alignment_buffered\":{\"min\":0,\"max\":0,\"avg\":0},\"processed_data\":{\"min\":0,\"max\":70,\"avg\":35},\"persisted_data\":{\"min\":0,\"max\":0,\"avg\":0}},\"latest\":{\"completed\":{\"@class\":\"completed\",\"id\":1,\"status\":\"COMPLETED\",\"is_save [...] - String flink15Response = - "{\"counts\":{\"restored\":0,\"total\":12,\"in_progress\":0,\"completed\":3,\"failed\":9},\"summary\":{\"checkpointed_size\":{\"min\":4308,\"max\":16053,\"avg\":11856,\"p50\":15207,\"p90\":16053,\"p95\":16053,\"p99\":16053,\"p999\":16053},\"state_size\":{\"min\":4308,\"max\":16053,\"avg\":11856,\"p50\":15207,\"p90\":16053,\"p95\":16053,\"p99\":16053,\"p999\":16053},\"end_to_end_duration\":{\"min\":31,\"max\":117,\"avg\":61,\"p50\":36,\"p90\":117,\"p95\":117,\"p99\":117,\" [...] - - objectMapper.readValue(flink14Response, CheckpointHistoryWrapper.class); - objectMapper.readValue(flink15Response, CheckpointHistoryWrapper.class); - } - - @Test - public void testGetInProgressCheckpointsFromResponseWithoutHistoryDetails() - throws JsonProcessingException { - ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); - String response = - "{\"counts\":{\"restored\":0,\"total\":2,\"in_progress\":0,\"completed\":2,\"failed\":0}}"; - var checkpointHistoryWrapper = - objectMapper.readValue(response, CheckpointHistoryWrapper.class); - Optional<CheckpointHistoryWrapper.PendingCheckpointInfo> optionalPendingCheckpointInfo = - assertDoesNotThrow(checkpointHistoryWrapper::getInProgressCheckpoint); - assertTrue(optionalPendingCheckpointInfo.isEmpty()); - } - - @Test - public void testGetInProgressCheckpointsWithoutHistory() { - CheckpointHistoryWrapper checkpointHistoryWrapper = new CheckpointHistoryWrapper(); - Optional<CheckpointHistoryWrapper.PendingCheckpointInfo> optionalPendingCheckpointInfo = - assertDoesNotThrow(checkpointHistoryWrapper::getInProgressCheckpoint); - assertTrue(optionalPendingCheckpointInfo.isEmpty()); - } - - @Test - public void testClusterInfoRestCompatibility() throws JsonProcessingException { - ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); - - String flink13Response = - "{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.13.6\",\"flink-revision\":\"b2ca390 @ 2022-02-03T14:54:22+01:00\",\"features\":{\"web-submit\":false}}"; - String flink14Response = - "{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.14.4\",\"flink-revision\":\"895c609 @ 2022-02-25T11:57:14+01:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}"; - - var dashboardConfiguration = - objectMapper.readValue(flink13Response, CustomDashboardConfiguration.class); - dashboardConfiguration = - objectMapper.readValue(flink14Response, CustomDashboardConfiguration.class); - } + }; - @Test - public void testRemoveOperatorConfig() { - Configuration deployConfig = createOperatorConfig(); - Configuration newConf = AbstractFlinkService.removeOperatorConfigs(deployConfig); - assertFalse(newConf.containsKey(OPERATOR_HEALTH_PROBE_PORT.key())); + flinkService.cancelJob( + TestUtils.buildApplicationCluster(flinkVersion), + UpgradeMode.SAVEPOINT, + new Configuration()); + assertTrue(tested.get()); } @Test @@ -409,209 +192,6 @@ public class NativeFlinkServiceTest { testingService.getRuntimeConfig().containsKey(OPERATOR_HEALTH_PROBE_PORT.key())); } - @Test - public void testEffectiveStatus() { - - JobDetails allRunning = - getJobDetails( - org.apache.flink.api.common.JobStatus.RUNNING, - Tuple2.of(ExecutionState.RUNNING, 4)); - assertEquals( - org.apache.flink.api.common.JobStatus.RUNNING, - AbstractFlinkService.getEffectiveStatus(allRunning)); - - JobDetails allRunningOrFinished = - getJobDetails( - org.apache.flink.api.common.JobStatus.RUNNING, - Tuple2.of(ExecutionState.RUNNING, 2), - Tuple2.of(ExecutionState.FINISHED, 2)); - assertEquals( - org.apache.flink.api.common.JobStatus.RUNNING, - AbstractFlinkService.getEffectiveStatus(allRunningOrFinished)); - - JobDetails allRunningOrScheduled = - getJobDetails( - org.apache.flink.api.common.JobStatus.RUNNING, - Tuple2.of(ExecutionState.RUNNING, 2), - Tuple2.of(ExecutionState.SCHEDULED, 2)); - assertEquals( - org.apache.flink.api.common.JobStatus.CREATED, - AbstractFlinkService.getEffectiveStatus(allRunningOrScheduled)); - - JobDetails allFinished = - getJobDetails( - org.apache.flink.api.common.JobStatus.FINISHED, - Tuple2.of(ExecutionState.FINISHED, 4)); - assertEquals( - org.apache.flink.api.common.JobStatus.FINISHED, - AbstractFlinkService.getEffectiveStatus(allFinished)); - } - - @Test - public void testNativeSavepointFormat() throws Exception { - final TestingClusterClient<String> testingClusterClient = - new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME); - final String savepointPath = "file:///path/of/svp"; - final CompletableFuture<Tuple4<JobID, String, Boolean, SavepointFormatType>> - triggerSavepointFuture = new CompletableFuture<>(); - configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointPath); - testingClusterClient.setRequestProcessor( - (headers, parameters, requestBody) -> { - triggerSavepointFuture.complete( - new Tuple4<>( - ((SavepointTriggerMessageParameters) parameters) - .jobID.getValue(), - ((SavepointTriggerRequestBody) requestBody) - .getTargetDirectory() - .get(), - ((SavepointTriggerRequestBody) requestBody).isCancelJob(), - ((SavepointTriggerRequestBody) requestBody).getFormatType())); - return CompletableFuture.completedFuture(new TriggerResponse(new TriggerId())); - }); - final CompletableFuture<Tuple3<JobID, SavepointFormatType, String>> - stopWithSavepointFuture = new CompletableFuture<>(); - testingClusterClient.setStopWithSavepointFormat( - (id, formatType, savepointDir) -> { - stopWithSavepointFuture.complete(new Tuple3<>(id, formatType, savepointDir)); - return CompletableFuture.completedFuture(savepointPath); - }); - - final FlinkService flinkService = createFlinkService(testingClusterClient); - - final JobID jobID = JobID.generate(); - final FlinkDeployment deployment = TestUtils.buildApplicationCluster(); - deployment - .getSpec() - .getFlinkConfiguration() - .put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), savepointPath); - deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY); - JobStatus jobStatus = deployment.getStatus().getJobStatus(); - jobStatus.setJobId(jobID.toHexString()); - jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name()); - ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration()); - - jobStatus.setJobId(jobID.toString()); - deployment.getStatus().setJobStatus(jobStatus); - flinkService.triggerSavepoint( - deployment.getStatus().getJobStatus().getJobId(), - SavepointTriggerType.MANUAL, - deployment.getStatus().getJobStatus().getSavepointInfo(), - new Configuration(configuration) - .set(OPERATOR_SAVEPOINT_FORMAT_TYPE, SavepointFormatType.NATIVE)); - assertTrue(triggerSavepointFuture.isDone()); - assertEquals(jobID, triggerSavepointFuture.get().f0); - assertEquals(savepointPath, triggerSavepointFuture.get().f1); - assertFalse(triggerSavepointFuture.get().f2); - assertEquals(SavepointFormatType.NATIVE, triggerSavepointFuture.get().f3); - - flinkService.cancelJob( - deployment, - UpgradeMode.SAVEPOINT, - new Configuration(configManager.getObserveConfig(deployment)) - .set(OPERATOR_SAVEPOINT_FORMAT_TYPE, SavepointFormatType.NATIVE)); - assertTrue(stopWithSavepointFuture.isDone()); - assertEquals(jobID, stopWithSavepointFuture.get().f0); - assertEquals(SavepointFormatType.NATIVE, stopWithSavepointFuture.get().f1); - assertEquals(savepointPath, stopWithSavepointFuture.get().f2); - } - - @Test - public void testDeletionPropagation() { - var propagation = new ArrayList<DeletionPropagation>(); - NativeFlinkService flinkService = - new NativeFlinkService( - client, null, executorService, operatorConfig, eventRecorder) { - @Override - protected void deleteClusterInternal( - ObjectMeta meta, - Configuration conf, - boolean deleteHaData, - DeletionPropagation deletionPropagation) { - propagation.add(deletionPropagation); - } - }; - - flinkService.deleteClusterDeployment( - new ObjectMeta(), new FlinkDeploymentStatus(), configuration, true); - assertEquals(DeletionPropagation.FOREGROUND, propagation.get(0)); - - configuration.set( - KubernetesOperatorConfigOptions.RESOURCE_DELETION_PROPAGATION, - DeletionPropagation.BACKGROUND); - - flinkService = - new NativeFlinkService( - client, - null, - executorService, - FlinkOperatorConfiguration.fromConfiguration(configuration), - eventRecorder) { - @Override - protected void deleteClusterInternal( - ObjectMeta meta, - Configuration conf, - boolean deleteHaData, - DeletionPropagation deletionPropagation) { - propagation.add(deletionPropagation); - } - }; - flinkService.deleteClusterDeployment( - new ObjectMeta(), new FlinkDeploymentStatus(), configuration, true); - assertEquals(DeletionPropagation.BACKGROUND, propagation.get(1)); - } - - @Test - public void testSendConfigOnRunJar() throws Exception { - var jarRuns = new ArrayList<JarRunRequestBody>(); - var flinkService = - new NativeFlinkService( - client, null, executorService, operatorConfig, eventRecorder) { - @Override - public RestClusterClient<String> getClusterClient(Configuration conf) - throws Exception { - var client = new TestingClusterClient<String>(conf); - client.setRequestProcessor( - (h, p, b) -> { - jarRuns.add((JarRunRequestBody) b); - return CompletableFuture.completedFuture(null); - }); - return client; - } - - @Override - protected JarUploadResponseBody uploadJar( - ObjectMeta objectMeta, FlinkSessionJobSpec spec, Configuration conf) { - return new JarUploadResponseBody("test"); - } - - @Override - protected void deleteJar(Configuration conf, String jarId) {} - }; - - var session = TestUtils.buildSessionCluster(); - session.getSpec().setFlinkVersion(FlinkVersion.v1_17); - session.getStatus() - .getReconciliationStatus() - .serializeAndSetLastReconciledSpec(session.getSpec(), session); - - var job = TestUtils.buildSessionJob(); - var deployConf = configManager.getSessionJobConfig(session, job.getSpec()); - flinkService.submitJobToSessionCluster(job.getMetadata(), job.getSpec(), deployConf, null); - - // Make sure that deploy conf was passed to jar run - assertEquals(deployConf.toMap(), jarRuns.get(0).getFlinkConfiguration().toMap()); - - session.getSpec().setFlinkVersion(FlinkVersion.v1_16); - session.getStatus() - .getReconciliationStatus() - .serializeAndSetLastReconciledSpec(session.getSpec(), session); - - deployConf = configManager.getSessionJobConfig(session, job.getSpec()); - flinkService.submitJobToSessionCluster(job.getMetadata(), job.getSpec(), deployConf, null); - - assertTrue(jarRuns.get(1).getFlinkConfiguration().toMap().isEmpty()); - } - @Test public void testScaling() throws Exception { var v1 = new JobVertexID(); @@ -916,6 +496,47 @@ public class NativeFlinkServiceTest { assertTrue(service.scalingCompleted(ctx)); } + @Test + public void resourceRestApiTest() throws Exception { + var testingClusterClient = new TestingClusterClient<String>(configuration); + var service = (NativeFlinkService) createFlinkService(testingClusterClient); + var jobId = new JobID(); + + var reqs = + new JobResourceRequirements( + Map.of( + new JobVertexID(), + new JobVertexResourceRequirements( + new JobVertexResourceRequirements.Parallelism(0, 2)))); + + testingClusterClient.setRequestProcessor( + (h, p, r) -> { + if (h instanceof JobResourceRequirementsHeaders) { + if (jobId.equals(((JobMessageParameters) p).jobPathParameter.getValue())) { + return CompletableFuture.completedFuture( + new JobResourceRequirementsBody(reqs)); + } + } else if (r instanceof JobResourceRequirementsBody) { + if (jobId.equals(((JobMessageParameters) p).jobPathParameter.getValue())) { + assertEquals( + Optional.of(reqs), + ((JobResourceRequirementsBody) r).asJobResourceRequirements()); + return CompletableFuture.completedFuture(null); + } + } + fail("unknown request"); + return null; + }); + + var deployment = TestUtils.buildApplicationCluster(); + deployment.getStatus().getJobStatus().setJobId(jobId.toString()); + assertEquals( + reqs.getJobVertexParallelisms(), + service.getVertexResources(testingClusterClient, deployment)); + service.updateVertexResources( + testingClusterClient, deployment, reqs.getJobVertexParallelisms()); + } + public static JobDetailsInfo createJobDetailsFor( List<JobDetailsInfo.JobVertexDetailsInfo> vertexInfos) { return new JobDetailsInfo( @@ -961,27 +582,7 @@ public class NativeFlinkServiceTest { } } - private JobDetails getJobDetails( - org.apache.flink.api.common.JobStatus status, - Tuple2<ExecutionState, Integer>... tasksPerState) { - int[] countPerState = new int[ExecutionState.values().length]; - for (var taskPerState : tasksPerState) { - countPerState[taskPerState.f0.ordinal()] = taskPerState.f1; - } - int numTasks = Arrays.stream(countPerState).sum(); - return new JobDetails( - new JobID(), - "test-job", - System.currentTimeMillis(), - -1, - 0, - status, - System.currentTimeMillis(), - countPerState, - numTasks); - } - - private FlinkService createFlinkService(RestClusterClient<String> clusterClient) { + private AbstractFlinkService createFlinkService(RestClusterClient<String> clusterClient) { return new NativeFlinkService( client, null, executorService, operatorConfig, eventRecorder) { @Override @@ -991,17 +592,6 @@ public class NativeFlinkServiceTest { }; } - private Deployment createTestingDeployment() { - return new DeploymentBuilder() - .withNewMetadata() - .withName(TestUtils.TEST_DEPLOYMENT_NAME) - .withNamespace(TestUtils.TEST_NAMESPACE) - .endMetadata() - .withNewSpec() - .endSpec() - .build(); - } - private Configuration createOperatorConfig() { Map<String, String> configMap = Map.of(OPERATOR_HEALTH_PROBE_PORT.key(), "80"); Configuration deployConfig = Configuration.fromMap(configMap); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/RestResponseTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/RestResponseTest.java new file mode 100644 index 00000000..a0ffd6e2 --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/RestResponseTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.service; + +import org.apache.flink.runtime.rest.util.RestMapperUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import org.junit.jupiter.api.Test; + +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** Tests for custom rest response classes. */ +public class RestResponseTest { + + private ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); + + @Test + public void testGetLastSavepointRestCompatibility() { + String flink14Response = + "{\"counts\":{\"restored\":0,\"total\":2,\"in_progress\":0,\"completed\":2,\"failed\":0},\"summary\":{\"state_size\":{\"min\":8646,\"max\":25626,\"avg\":17136},\"end_to_end_duration\":{\"min\":95,\"max\":420,\"avg\":257},\"alignment_buffered\":{\"min\":0,\"max\":0,\"avg\":0},\"processed_data\":{\"min\":0,\"max\":70,\"avg\":35},\"persisted_data\":{\"min\":0,\"max\":0,\"avg\":0}},\"latest\":{\"completed\":{\"@class\":\"completed\",\"id\":1,\"status\":\"COMPLETED\",\"is_save [...] + String flink15Response = + "{\"counts\":{\"restored\":0,\"total\":12,\"in_progress\":0,\"completed\":3,\"failed\":9},\"summary\":{\"checkpointed_size\":{\"min\":4308,\"max\":16053,\"avg\":11856,\"p50\":15207,\"p90\":16053,\"p95\":16053,\"p99\":16053,\"p999\":16053},\"state_size\":{\"min\":4308,\"max\":16053,\"avg\":11856,\"p50\":15207,\"p90\":16053,\"p95\":16053,\"p99\":16053,\"p999\":16053},\"end_to_end_duration\":{\"min\":31,\"max\":117,\"avg\":61,\"p50\":36,\"p90\":117,\"p95\":117,\"p99\":117,\" [...] + + assertDoesNotThrow( + () -> objectMapper.readValue(flink14Response, CheckpointHistoryWrapper.class)); + assertDoesNotThrow( + () -> objectMapper.readValue(flink15Response, CheckpointHistoryWrapper.class)); + } + + @Test + public void testGetInProgressCheckpointsFromResponseWithoutHistoryDetails() + throws JsonProcessingException { + String response = + "{\"counts\":{\"restored\":0,\"total\":2,\"in_progress\":0,\"completed\":2,\"failed\":0}}"; + var checkpointHistoryWrapper = + objectMapper.readValue(response, CheckpointHistoryWrapper.class); + Optional<CheckpointHistoryWrapper.PendingCheckpointInfo> optionalPendingCheckpointInfo = + assertDoesNotThrow(checkpointHistoryWrapper::getInProgressCheckpoint); + assertTrue(optionalPendingCheckpointInfo.isEmpty()); + } + + @Test + public void testGetInProgressCheckpointsWithoutHistory() { + CheckpointHistoryWrapper checkpointHistoryWrapper = new CheckpointHistoryWrapper(); + Optional<CheckpointHistoryWrapper.PendingCheckpointInfo> optionalPendingCheckpointInfo = + assertDoesNotThrow(checkpointHistoryWrapper::getInProgressCheckpoint); + assertTrue(optionalPendingCheckpointInfo.isEmpty()); + } + + @Test + public void testClusterInfoRestCompatibility() { + String flink13Response = + "{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.13.6\",\"flink-revision\":\"b2ca390 @ 2022-02-03T14:54:22+01:00\",\"features\":{\"web-submit\":false}}"; + String flink14Response = + "{\"refresh-interval\":3000,\"timezone-name\":\"Coordinated Universal Time\",\"timezone-offset\":0,\"flink-version\":\"1.14.4\",\"flink-revision\":\"895c609 @ 2022-02-25T11:57:14+01:00\",\"features\":{\"web-submit\":false,\"web-cancel\":false}}"; + + assertDoesNotThrow( + () -> objectMapper.readValue(flink13Response, CustomDashboardConfiguration.class)); + assertDoesNotThrow( + () -> objectMapper.readValue(flink14Response, CustomDashboardConfiguration.class)); + } +}
