This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 6472dce  [FLINK-28087] Add validation for the meta.name of 
FlinkDeployment CR
6472dce is described below

commit 6472dcea93945760722ce5abbeaae0a63f87840a
Author: PengYuan <[email protected]>
AuthorDate: Thu Jun 16 22:00:37 2022 +0800

    [FLINK-28087] Add validation for the meta.name of FlinkDeployment CR
---
 .../operator/validation/DefaultValidator.java           | 17 ++++++++++++++++-
 .../operator/validation/DefaultValidatorTest.java       |  6 ++++++
 .../operator/admission/AdmissionHandlerTest.java        |  4 ++++
 3 files changed, 26 insertions(+), 1 deletion(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index 0c28ff8..8448b3d 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -48,10 +48,13 @@ import javax.annotation.Nullable;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 /** Default validator implementation for {@link FlinkDeployment}. */
 public class DefaultValidator implements FlinkResourceValidator {
-
+    private static final Pattern DEPLOYMENT_NAME_PATTERN =
+            Pattern.compile("[a-z]([-a-z\\d]{0,43}[a-z\\d])?");
     private static final String[] FORBIDDEN_CONF_KEYS =
             new String[] {
                 KubernetesConfigOptions.NAMESPACE.key(), 
KubernetesConfigOptions.CLUSTER_ID.key()
@@ -77,6 +80,7 @@ public class DefaultValidator implements 
FlinkResourceValidator {
             effectiveConfig.putAll(spec.getFlinkConfiguration());
         }
         return firstPresent(
+                validateDeploymentName(deployment.getMetadata().getName()),
                 validateFlinkVersion(spec.getFlinkVersion()),
                 validateFlinkDeploymentConfig(effectiveConfig),
                 validateIngress(
@@ -100,6 +104,17 @@ public class DefaultValidator implements 
FlinkResourceValidator {
         return Optional.empty();
     }
 
+    private Optional<String> validateDeploymentName(String name) {
+        Matcher matcher = DEPLOYMENT_NAME_PATTERN.matcher(name);
+        if (!matcher.matches()) {
+            return Optional.of(
+                    String.format(
+                            "The FlinkDeployment name: %s is invalid, must 
consist of lower case alphanumeric characters or '-', start with an alphabetic 
character, and end with an alphanumeric character (e.g. 'my-name',  or 
'abc-123'), and the length must be no more than 45 characters.",
+                            name));
+        }
+        return Optional.empty();
+    }
+
     private Optional<String> validateFlinkVersion(FlinkVersion version) {
         if (version == null) {
             return Optional.of("Flink Version must be defined.");
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index 235bb1b..d0dce8f 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -68,6 +68,12 @@ public class DefaultValidatorTest {
     public void testValidationWithoutDefaultConfig() {
         testSuccess(dep -> {});
 
+        // Test meta.name
+        testSuccess(dep -> dep.getMetadata().setName("session-cluster"));
+        testError(
+                dep -> dep.getMetadata().setName("session-cluster-1.13"),
+                "The FlinkDeployment name: session-cluster-1.13 is invalid, 
must consist of lower case alphanumeric characters or '-', start with an 
alphabetic character, and end with an alphanumeric character (e.g. 'my-name',  
or 'abc-123'), and the length must be no more than 45 characters.");
+
         // Test job validation
         testError(dep -> dep.getSpec().getJob().setJarURI(null), "Jar URI must 
be defined");
         testError(
diff --git 
a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
 
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
index 9476bd0..327981a 100644
--- 
a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
+++ 
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
@@ -37,6 +37,7 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpRes
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.exc.MismatchedInputException;
 import io.fabric8.kubernetes.api.model.GroupVersionKind;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.api.model.admission.v1.AdmissionRequest;
 import io.fabric8.kubernetes.api.model.admission.v1.AdmissionReview;
 import org.junit.jupiter.api.Assertions;
@@ -99,6 +100,9 @@ public class AdmissionHandlerTest {
     public void testHandleValidateRequestWithAdmissionReview() throws 
IOException {
         final EmbeddedChannel embeddedChannel = new 
EmbeddedChannel(admissionHandler);
         final FlinkDeployment flinkDeployment = new FlinkDeployment();
+        ObjectMeta objectMeta = new ObjectMeta();
+        objectMeta.setName("basic-session-cluster");
+        flinkDeployment.setMetadata(objectMeta);
         flinkDeployment.setSpec(new FlinkDeploymentSpec());
         final AdmissionRequest admissionRequest = new AdmissionRequest();
         admissionRequest.setOperation(CREATE.name());

Reply via email to