This is an automated email from the ASF dual-hosted git repository.
wangyang0918 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 11944da [FLINK-26367] Move sanity check in FlinkService#cancelJob to
DefaultDeploymentValidator
11944da is described below
commit 11944da080373834f26c726895f3b90b1d24b46c
Author: SteNicholas <[email protected]>
AuthorDate: Wed Mar 2 10:49:51 2022 +0800
[FLINK-26367] Move sanity check in FlinkService#cancelJob to
DefaultDeploymentValidator
This closes #32.
---
.../flink/kubernetes/operator/service/FlinkService.java | 6 ------
.../operator/validation/DefaultDeploymentValidator.java | 10 ++++++++--
.../kubernetes/operator/service/FlinkServiceTest.java | 15 ---------------
.../operator/validation/DeploymentValidatorTest.java | 3 +++
flink-kubernetes-webhook/pom.xml | 6 +++++-
5 files changed, 16 insertions(+), 24 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index dd3d8fb..9822d90 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -35,11 +35,9 @@ import
org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorato
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
-import
org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import org.slf4j.Logger;
@@ -153,10 +151,6 @@ public class FlinkService {
savepointOpt = Optional.of(savepoint);
break;
case LAST_STATE:
- if
(!HighAvailabilityMode.isHighAvailabilityModeActivated(conf)) {
- throw new InvalidDeploymentException(
- "Job could not be upgraded with last-state
while HA disabled");
- }
final String namespace =
conf.getString(KubernetesConfigOptions.NAMESPACE);
final String clusterId = clusterClient.getClusterId();
FlinkUtils.deleteCluster(namespace, clusterId,
kubernetesClient, false);
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
index 2a385be..b1208a5 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
@@ -43,7 +43,7 @@ public class DefaultDeploymentValidator implements
FlinkDeploymentValidator {
FlinkDeploymentSpec spec = deployment.getSpec();
return firstPresent(
validateFlinkConfig(spec.getFlinkConfiguration()),
- validateJobSpec(spec.getJob()),
+ validateJobSpec(spec.getJob(), spec.getFlinkConfiguration()),
validateJmSpec(spec.getJobManager(),
spec.getFlinkConfiguration()),
validateTmSpec(spec.getTaskManager()),
validateSpecChange(deployment));
@@ -71,7 +71,7 @@ public class DefaultDeploymentValidator implements
FlinkDeploymentValidator {
return Optional.empty();
}
- private Optional<String> validateJobSpec(JobSpec job) {
+ private Optional<String> validateJobSpec(JobSpec job, Map<String, String>
confMap) {
if (job == null) {
return Optional.empty();
}
@@ -84,6 +84,12 @@ public class DefaultDeploymentValidator implements
FlinkDeploymentValidator {
return Optional.of("Jar URI must be defined");
}
+ if (job.getUpgradeMode() == UpgradeMode.LAST_STATE
+ && !HighAvailabilityMode.isHighAvailabilityModeActivated(
+ Configuration.fromMap(confMap))) {
+ return Optional.of("Job could not be upgraded with last-state
while HA disabled");
+ }
+
return Optional.empty();
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
index 594aeea..c760629 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
@@ -26,7 +26,6 @@ import
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
import org.apache.flink.kubernetes.operator.TestingClusterClient;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
-import
org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
import org.apache.flink.runtime.messages.Acknowledge;
import io.fabric8.kubernetes.api.model.apps.Deployment;
@@ -44,7 +43,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/** @link FlinkService unit tests */
@@ -140,19 +138,6 @@ public class FlinkServiceTest {
.get());
}
- @Test
- public void testCancelJobWithLastStateUpgradeModeWhenHADisabled() {
- configuration.set(HighAvailabilityOptions.HA_MODE, "None");
- final TestingClusterClient<String> testingClusterClient =
- new TestingClusterClient<>(CLUSTER_ID);
- final FlinkService flinkService =
createFlinkService(testingClusterClient);
-
- final JobID jobID = JobID.generate();
- assertThrows(
- InvalidDeploymentException.class,
- () -> flinkService.cancelJob(jobID, UpgradeMode.LAST_STATE,
configuration));
- }
-
private FlinkService createFlinkService(ClusterClient<String>
clusterClient) {
return new FlinkService((NamespacedKubernetesClient) client) {
@Override
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
index 40fd1c7..6cc8d09 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
@@ -57,6 +57,9 @@ public class DeploymentValidatorTest {
testError(
dep -> dep.getSpec().getJob().setParallelism(-1),
"Job parallelism must be larger than 0");
+ testError(
+ dep ->
dep.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE),
+ "Job could not be upgraded with last-state while HA disabled");
// Test conf validation
testSuccess(
diff --git a/flink-kubernetes-webhook/pom.xml b/flink-kubernetes-webhook/pom.xml
index 0a82987..1bcd4ca 100644
--- a/flink-kubernetes-webhook/pom.xml
+++ b/flink-kubernetes-webhook/pom.xml
@@ -39,7 +39,11 @@ under the License.
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
- <artifactId>*</artifactId>
+ <artifactId>flink-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-netty</artifactId>
</exclusion>
</exclusions>
</dependency>