This is an automated email from the ASF dual-hosted git repository.

wangyang0918 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 11944da  [FLINK-26367] Move sanity check in FlinkService#cancelJob to 
DefaultDeploymentValidator
11944da is described below

commit 11944da080373834f26c726895f3b90b1d24b46c
Author: SteNicholas <[email protected]>
AuthorDate: Wed Mar 2 10:49:51 2022 +0800

    [FLINK-26367] Move sanity check in FlinkService#cancelJob to 
DefaultDeploymentValidator
    
    This closes #32.
---
 .../flink/kubernetes/operator/service/FlinkService.java   |  6 ------
 .../operator/validation/DefaultDeploymentValidator.java   | 10 ++++++++--
 .../kubernetes/operator/service/FlinkServiceTest.java     | 15 ---------------
 .../operator/validation/DeploymentValidatorTest.java      |  3 +++
 flink-kubernetes-webhook/pom.xml                          |  6 +++++-
 5 files changed, 16 insertions(+), 24 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index dd3d8fb..9822d90 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -35,11 +35,9 @@ import 
org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorato
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
-import 
org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import 
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 
 import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
 import org.slf4j.Logger;
@@ -153,10 +151,6 @@ public class FlinkService {
                     savepointOpt = Optional.of(savepoint);
                     break;
                 case LAST_STATE:
-                    if 
(!HighAvailabilityMode.isHighAvailabilityModeActivated(conf)) {
-                        throw new InvalidDeploymentException(
-                                "Job could not be upgraded with last-state 
while HA disabled");
-                    }
                     final String namespace = 
conf.getString(KubernetesConfigOptions.NAMESPACE);
                     final String clusterId = clusterClient.getClusterId();
                     FlinkUtils.deleteCluster(namespace, clusterId, 
kubernetesClient, false);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
index 2a385be..b1208a5 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
@@ -43,7 +43,7 @@ public class DefaultDeploymentValidator implements 
FlinkDeploymentValidator {
         FlinkDeploymentSpec spec = deployment.getSpec();
         return firstPresent(
                 validateFlinkConfig(spec.getFlinkConfiguration()),
-                validateJobSpec(spec.getJob()),
+                validateJobSpec(spec.getJob(), spec.getFlinkConfiguration()),
                 validateJmSpec(spec.getJobManager(), 
spec.getFlinkConfiguration()),
                 validateTmSpec(spec.getTaskManager()),
                 validateSpecChange(deployment));
@@ -71,7 +71,7 @@ public class DefaultDeploymentValidator implements 
FlinkDeploymentValidator {
         return Optional.empty();
     }
 
-    private Optional<String> validateJobSpec(JobSpec job) {
+    private Optional<String> validateJobSpec(JobSpec job, Map<String, String> 
confMap) {
         if (job == null) {
             return Optional.empty();
         }
@@ -84,6 +84,12 @@ public class DefaultDeploymentValidator implements 
FlinkDeploymentValidator {
             return Optional.of("Jar URI must be defined");
         }
 
+        if (job.getUpgradeMode() == UpgradeMode.LAST_STATE
+                && !HighAvailabilityMode.isHighAvailabilityModeActivated(
+                        Configuration.fromMap(confMap))) {
+            return Optional.of("Job could not be upgraded with last-state 
while HA disabled");
+        }
+
         return Optional.empty();
     }
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
index 594aeea..c760629 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
@@ -26,7 +26,6 @@ import 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
 import org.apache.flink.kubernetes.operator.TestingClusterClient;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
-import 
org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
 import org.apache.flink.runtime.messages.Acknowledge;
 
 import io.fabric8.kubernetes.api.model.apps.Deployment;
@@ -44,7 +43,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** @link FlinkService unit tests */
@@ -140,19 +138,6 @@ public class FlinkServiceTest {
                         .get());
     }
 
-    @Test
-    public void testCancelJobWithLastStateUpgradeModeWhenHADisabled() {
-        configuration.set(HighAvailabilityOptions.HA_MODE, "None");
-        final TestingClusterClient<String> testingClusterClient =
-                new TestingClusterClient<>(CLUSTER_ID);
-        final FlinkService flinkService = 
createFlinkService(testingClusterClient);
-
-        final JobID jobID = JobID.generate();
-        assertThrows(
-                InvalidDeploymentException.class,
-                () -> flinkService.cancelJob(jobID, UpgradeMode.LAST_STATE, 
configuration));
-    }
-
     private FlinkService createFlinkService(ClusterClient<String> 
clusterClient) {
         return new FlinkService((NamespacedKubernetesClient) client) {
             @Override
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
index 40fd1c7..6cc8d09 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
@@ -57,6 +57,9 @@ public class DeploymentValidatorTest {
         testError(
                 dep -> dep.getSpec().getJob().setParallelism(-1),
                 "Job parallelism must be larger than 0");
+        testError(
+                dep -> 
dep.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE),
+                "Job could not be upgraded with last-state while HA disabled");
 
         // Test conf validation
         testSuccess(
diff --git a/flink-kubernetes-webhook/pom.xml b/flink-kubernetes-webhook/pom.xml
index 0a82987..1bcd4ca 100644
--- a/flink-kubernetes-webhook/pom.xml
+++ b/flink-kubernetes-webhook/pom.xml
@@ -39,7 +39,11 @@ under the License.
             <exclusions>
                 <exclusion>
                     <groupId>org.apache.flink</groupId>
-                    <artifactId>*</artifactId>
+                    <artifactId>flink-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>flink-shaded-netty</artifactId>
                 </exclusion>
             </exclusions>
         </dependency>

Reply via email to