This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch release-1.16.0.1-acs
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 81b41165f89456c8b8ae19c99f8b50873d7c33d4
Author: Gyula Fora <g_f...@apple.com>
AuthorDate: Fri Feb 25 09:29:08 2022 +0100

    [apple][internal] Support excluding kubernetes pod decorators
---
 .../configuration/KubernetesConfigOptions.java     |  8 +++++
 .../decorators/KerberosMountDecorator.java         |  7 ++--
 .../factory/KubernetesJobManagerFactory.java       |  7 ++++
 .../factory/KubernetesTaskManagerFactory.java      |  8 +++++
 .../parameters/AbstractKubernetesParameters.java   | 10 ++++++
 .../parameters/KubernetesParameters.java           |  3 ++
 .../factory/KubernetesJobManagerFactoryTest.java   | 38 +++++++++++++++++++++-
 .../factory/KubernetesTaskManagerFactoryTest.java  | 31 ++++++++++++++++++
 .../AbstractKubernetesParametersTest.java          |  6 ++++
 9 files changed, 114 insertions(+), 4 deletions(-)

diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
index 9cce89003bc..a566fbd4865 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
@@ -530,6 +530,14 @@ public class KubernetesConfigOptions {
                             "The node label whose value is the same as the 
node name. "
                                     + "Currently, this will only be used to 
set the node affinity of TM pods to avoid being scheduled on blocked nodes.");
 
+    public static final ConfigOption<List<String>> DECORATOR_EXCLUDE =
+            key("kubernetes.decorator.exclude")
+                    .stringType()
+                    .asList()
+                    .noDefaultValue()
+                    .withDescription(
+                            "A semicolon-separated list of the Kubernetes step 
decorator class names to be excluded from the JM/TM factories");
+
     private static String getDefaultFlinkImage() {
         // The default container image that ties to the exact needed versions 
of both Flink and
         // Scala.
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/KerberosMountDecorator.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/KerberosMountDecorator.java
index f6440f1decc..dc68e09c95c 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/KerberosMountDecorator.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/KerberosMountDecorator.java
@@ -51,16 +51,15 @@ public class KerberosMountDecorator extends 
AbstractKubernetesStepDecorator {
     private static final Logger LOG = 
LoggerFactory.getLogger(KerberosMountDecorator.class);
 
     private final AbstractKubernetesParameters kubernetesParameters;
-    private final SecurityConfiguration securityConfig;
 
     public KerberosMountDecorator(AbstractKubernetesParameters 
kubernetesParameters) {
         this.kubernetesParameters = checkNotNull(kubernetesParameters);
-        this.securityConfig =
-                new 
SecurityConfiguration(kubernetesParameters.getFlinkConfiguration());
     }
 
     @Override
     public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
+        SecurityConfiguration securityConfig =
+                new 
SecurityConfiguration(kubernetesParameters.getFlinkConfiguration());
         PodBuilder podBuilder = new 
PodBuilder(flinkPod.getPodWithoutMainContainer());
         ContainerBuilder containerBuilder = new 
ContainerBuilder(flinkPod.getMainContainer());
 
@@ -131,6 +130,8 @@ public class KerberosMountDecorator extends 
AbstractKubernetesStepDecorator {
 
     @Override
     public List<HasMetadata> buildAccompanyingKubernetesResources() throws 
IOException {
+        SecurityConfiguration securityConfig =
+                new 
SecurityConfiguration(kubernetesParameters.getFlinkConfiguration());
 
         final List<HasMetadata> resources = new ArrayList<>();
 
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java
index 9d88e74ef6a..ae87ab676e6 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java
@@ -47,6 +47,7 @@ import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -75,7 +76,13 @@ public class KubernetesJobManagerFactory {
                     new 
PodTemplateMountDecorator(kubernetesJobManagerParameters)
                 };
 
+        Set<String> excludedDecorators =
+                kubernetesJobManagerParameters.getExcludedDecoratorClasses();
+
         for (KubernetesStepDecorator stepDecorator : stepDecorators) {
+            if 
(excludedDecorators.contains(stepDecorator.getClass().getName())) {
+                continue;
+            }
             flinkPod = stepDecorator.decorateFlinkPod(flinkPod);
             
accompanyingResources.addAll(stepDecorator.buildAccompanyingKubernetesResources());
         }
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactory.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactory.java
index 3a323a831a7..e4d3c4abb11 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactory.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactory.java
@@ -34,6 +34,8 @@ import org.apache.flink.util.Preconditions;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.PodBuilder;
 
+import java.util.Set;
+
 /** Utility class for constructing the TaskManager Pod on the JobManager. */
 public class KubernetesTaskManagerFactory {
 
@@ -52,7 +54,13 @@ public class KubernetesTaskManagerFactory {
                     new 
FlinkConfMountDecorator(kubernetesTaskManagerParameters)
                 };
 
+        Set<String> excludedDecorators =
+                kubernetesTaskManagerParameters.getExcludedDecoratorClasses();
+
         for (KubernetesStepDecorator stepDecorator : stepDecorators) {
+            if 
(excludedDecorators.contains(stepDecorator.getClass().getName())) {
+                continue;
+            }
             flinkPod = stepDecorator.decorateFlinkPod(flinkPod);
         }
 
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
index c9bd7ef0397..c768e576043 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
@@ -29,9 +29,11 @@ import org.apache.commons.lang3.StringUtils;
 
 import java.io.File;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 
 import static 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.CONTAINER_IMAGE_PULL_SECRETS;
 import static 
org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME;
@@ -206,4 +208,12 @@ public abstract class AbstractKubernetesParameters 
implements KubernetesParamete
     public boolean isHostNetworkEnabled() {
         return 
flinkConfig.getBoolean(KubernetesConfigOptions.KUBERNETES_HOSTNETWORK_ENABLED);
     }
+
+    @Override
+    public Set<String> getExcludedDecoratorClasses() {
+        return new HashSet<>(
+                flinkConfig
+                        .getOptional(KubernetesConfigOptions.DECORATOR_EXCLUDE)
+                        .orElse(Collections.emptyList()));
+    }
 }
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
index c6b335449a8..7e3aad2715d 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
@@ -25,6 +25,7 @@ import io.fabric8.kubernetes.api.model.LocalObjectReference;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 
 /**
  * A common collection of parameters that is used to construct the 
JobManager/TaskManager Pods,
@@ -110,4 +111,6 @@ public interface KubernetesParameters {
      * container(s).
      */
     List<Map<String, String>> getEnvironmentsFromSecrets();
+
+    Set<String> getExcludedDecoratorClasses();
 }
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java
index 302655dd5c3..7bddbb96484 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.kubernetes.kubeclient.factory;
 
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.SecurityOptions;
@@ -34,6 +36,7 @@ import 
org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator
 import 
org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator;
 import 
org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator;
 import 
org.apache.flink.kubernetes.kubeclient.decorators.KerberosMountDecorator;
+import 
org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
 import 
org.apache.flink.kubernetes.kubeclient.services.HeadlessClusterIPService;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
@@ -349,7 +352,40 @@ class KubernetesJobManagerFactoryTest extends 
KubernetesJobManagerTestBase {
     }
 
     @Test
-    void testFlinkConfConfigMap() throws IOException {
+    public void testDecoratorExclusion() throws IOException {
+        Configuration confCopy = new Configuration(flinkConfig);
+        confCopy.set(SecurityOptions.KERBEROS_LOGIN_KEYTAB, "missing.file");
+        confCopy.set(
+                KubernetesConfigOptions.DECORATOR_EXCLUDE,
+                Collections.singletonList(
+                        
"org.apache.flink.kubernetes.kubeclient.decorators.KerberosMountDecorator"));
+        kubernetesJobManagerSpecification =
+                
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
+                        flinkPod,
+                        new KubernetesJobManagerParameters(
+                                confCopy,
+                                new 
ClusterSpecification.ClusterSpecificationBuilder()
+                                        .setMasterMemoryMB(1024)
+                                        .setTaskManagerMemoryMB(1024)
+                                        .setSlotsPerTaskManager(3)
+                                        .createClusterSpecification()));
+
+        boolean kerbSecretExists =
+                
this.kubernetesJobManagerSpecification.getAccompanyingResources().stream()
+                        .anyMatch(
+                                x ->
+                                        x instanceof Secret
+                                                && x.getMetadata()
+                                                        .getName()
+                                                        .equals(
+                                                                
KerberosMountDecorator
+                                                                        
.getKerberosKeytabSecretName(
+                                                                               
 CLUSTER_ID)));
+        assertThat(kerbSecretExists).isEqualTo(false);
+    }
+
+    @Test
+    public void testFlinkConfConfigMap() throws IOException {
         kubernetesJobManagerSpecification =
                 
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
                         flinkPod, kubernetesJobManagerParameters);
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactoryTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactoryTest.java
index f49b8ad996f..3d1b898a72d 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactoryTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactoryTest.java
@@ -18,16 +18,21 @@
 
 package org.apache.flink.kubernetes.kubeclient.factory;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.kubernetes.KubernetesTestUtils;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.kubeclient.FlinkPod;
 import org.apache.flink.kubernetes.kubeclient.KubernetesTaskManagerTestBase;
+import 
org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters;
 import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
 
 import io.fabric8.kubernetes.api.model.Container;
 import io.fabric8.kubernetes.api.model.Pod;
 import org.junit.jupiter.api.Test;
 
+import java.util.Collections;
 import java.util.List;
 
 import static 
org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME;
@@ -98,4 +103,30 @@ class KubernetesTaskManagerFactoryTest extends 
KubernetesTaskManagerTestBase {
         assertThat(resultMainContainer.getArgs()).hasSize(3);
         assertThat(resultMainContainer.getVolumeMounts()).hasSize(4);
     }
+
+    @Test
+    public void testDecoratorExclusion() {
+        Configuration confCopy = new Configuration(flinkConfig);
+        confCopy.set(SecurityOptions.KERBEROS_LOGIN_KEYTAB, "missing.file");
+        confCopy.set(
+                KubernetesConfigOptions.DECORATOR_EXCLUDE,
+                Collections.singletonList(
+                        
"org.apache.flink.kubernetes.kubeclient.decorators.KerberosMountDecorator"));
+
+        KubernetesTaskManagerParameters parameters =
+                new KubernetesTaskManagerParameters(
+                        confCopy,
+                        POD_NAME,
+                        DYNAMIC_PROPERTIES,
+                        JVM_MEM_OPTS_ENV,
+                        containeredTaskManagerParameters,
+                        
ExternalResourceUtils.getExternalResourceConfigurationKeys(
+                                flinkConfig,
+                                KubernetesConfigOptions
+                                        
.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX),
+                        BLOCKED_NODES);
+
+        KubernetesTaskManagerFactory.buildTaskManagerKubernetesPod(
+                new FlinkPod.Builder().build(), parameters);
+    }
 }
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParametersTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParametersTest.java
index 77dd53c7761..692fb1cd98e 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParametersTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParametersTest.java
@@ -38,6 +38,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
+import java.util.Set;
 
 import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -193,5 +194,10 @@ public class AbstractKubernetesParametersTest {
         public List<Map<String, String>> getTolerations() {
             throw new UnsupportedOperationException("NOT supported");
         }
+
+        @Override
+        public Set<String> getExcludedDecoratorClasses() {
+            return Collections.emptySet();
+        }
     }
 }

Reply via email to