This is an automated email from the ASF dual-hosted git repository. mbalassi pushed a commit to branch release-1.16.0.1-acs in repository https://gitbox.apache.org/repos/asf/flink.git
commit 81b41165f89456c8b8ae19c99f8b50873d7c33d4 Author: Gyula Fora <g_f...@apple.com> AuthorDate: Fri Feb 25 09:29:08 2022 +0100 [apple][internal] Support excluding kubernetes pod decorators --- .../configuration/KubernetesConfigOptions.java | 8 +++++ .../decorators/KerberosMountDecorator.java | 7 ++-- .../factory/KubernetesJobManagerFactory.java | 7 ++++ .../factory/KubernetesTaskManagerFactory.java | 8 +++++ .../parameters/AbstractKubernetesParameters.java | 10 ++++++ .../parameters/KubernetesParameters.java | 3 ++ .../factory/KubernetesJobManagerFactoryTest.java | 38 +++++++++++++++++++++- .../factory/KubernetesTaskManagerFactoryTest.java | 31 ++++++++++++++++++ .../AbstractKubernetesParametersTest.java | 6 ++++ 9 files changed, 114 insertions(+), 4 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java index 9cce89003bc..a566fbd4865 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java @@ -530,6 +530,14 @@ public class KubernetesConfigOptions { "The node label whose value is the same as the node name. " + "Currently, this will only be used to set the node affinity of TM pods to avoid being scheduled on blocked nodes."); + public static final ConfigOption<List<String>> DECORATOR_EXCLUDE = + key("kubernetes.decorator.exclude") + .stringType() + .asList() + .noDefaultValue() + .withDescription( + "A semicolon-separated list of the Kubernetes step decorator class names to be excluded from the JM/TM factories"); + private static String getDefaultFlinkImage() { // The default container image that ties to the exact needed versions of both Flink and // Scala. diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/KerberosMountDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/KerberosMountDecorator.java index f6440f1decc..dc68e09c95c 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/KerberosMountDecorator.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/KerberosMountDecorator.java @@ -51,16 +51,15 @@ public class KerberosMountDecorator extends AbstractKubernetesStepDecorator { private static final Logger LOG = LoggerFactory.getLogger(KerberosMountDecorator.class); private final AbstractKubernetesParameters kubernetesParameters; - private final SecurityConfiguration securityConfig; public KerberosMountDecorator(AbstractKubernetesParameters kubernetesParameters) { this.kubernetesParameters = checkNotNull(kubernetesParameters); - this.securityConfig = - new SecurityConfiguration(kubernetesParameters.getFlinkConfiguration()); } @Override public FlinkPod decorateFlinkPod(FlinkPod flinkPod) { + SecurityConfiguration securityConfig = + new SecurityConfiguration(kubernetesParameters.getFlinkConfiguration()); PodBuilder podBuilder = new PodBuilder(flinkPod.getPodWithoutMainContainer()); ContainerBuilder containerBuilder = new ContainerBuilder(flinkPod.getMainContainer()); @@ -131,6 +130,8 @@ public class KerberosMountDecorator extends AbstractKubernetesStepDecorator { @Override public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOException { + SecurityConfiguration securityConfig = + new SecurityConfiguration(kubernetesParameters.getFlinkConfiguration()); final List<HasMetadata> resources = new ArrayList<>(); diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java index 9d88e74ef6a..ae87ab676e6 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java @@ -47,6 +47,7 @@ import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; /** @@ -75,7 +76,13 @@ public class KubernetesJobManagerFactory { new PodTemplateMountDecorator(kubernetesJobManagerParameters) }; + Set<String> excludedDecorators = + kubernetesJobManagerParameters.getExcludedDecoratorClasses(); + for (KubernetesStepDecorator stepDecorator : stepDecorators) { + if (excludedDecorators.contains(stepDecorator.getClass().getName())) { + continue; + } flinkPod = stepDecorator.decorateFlinkPod(flinkPod); accompanyingResources.addAll(stepDecorator.buildAccompanyingKubernetesResources()); } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactory.java index 3a323a831a7..e4d3c4abb11 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactory.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactory.java @@ -34,6 +34,8 @@ import org.apache.flink.util.Preconditions; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodBuilder; +import java.util.Set; + /** Utility class for constructing the TaskManager Pod on the JobManager. */ public class KubernetesTaskManagerFactory { @@ -52,7 +54,13 @@ public class KubernetesTaskManagerFactory { new FlinkConfMountDecorator(kubernetesTaskManagerParameters) }; + Set<String> excludedDecorators = + kubernetesTaskManagerParameters.getExcludedDecoratorClasses(); + for (KubernetesStepDecorator stepDecorator : stepDecorators) { + if (excludedDecorators.contains(stepDecorator.getClass().getName())) { + continue; + } flinkPod = stepDecorator.decorateFlinkPod(flinkPod); } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java index c9bd7ef0397..c768e576043 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java @@ -29,9 +29,11 @@ import org.apache.commons.lang3.StringUtils; import java.io.File; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import static org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.CONTAINER_IMAGE_PULL_SECRETS; import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME; @@ -206,4 +208,12 @@ public abstract class AbstractKubernetesParameters implements KubernetesParamete public boolean isHostNetworkEnabled() { return flinkConfig.getBoolean(KubernetesConfigOptions.KUBERNETES_HOSTNETWORK_ENABLED); } + + @Override + public Set<String> getExcludedDecoratorClasses() { + return new HashSet<>( + flinkConfig + .getOptional(KubernetesConfigOptions.DECORATOR_EXCLUDE) + .orElse(Collections.emptyList())); + } } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java index c6b335449a8..7e3aad2715d 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java @@ -25,6 +25,7 @@ import io.fabric8.kubernetes.api.model.LocalObjectReference; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; /** * A common collection of parameters that is used to construct the JobManager/TaskManager Pods, @@ -110,4 +111,6 @@ public interface KubernetesParameters { * container(s). */ List<Map<String, String>> getEnvironmentsFromSecrets(); + + Set<String> getExcludedDecoratorClasses(); } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java index 302655dd5c3..7bddbb96484 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java @@ -18,6 +18,8 @@ package org.apache.flink.kubernetes.kubeclient.factory; +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.SecurityOptions; @@ -34,6 +36,7 @@ import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator import org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.KerberosMountDecorator; +import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters; import org.apache.flink.kubernetes.kubeclient.services.HeadlessClusterIPService; import org.apache.flink.kubernetes.utils.Constants; import org.apache.flink.kubernetes.utils.KubernetesUtils; @@ -349,7 +352,40 @@ class KubernetesJobManagerFactoryTest extends KubernetesJobManagerTestBase { } @Test - void testFlinkConfConfigMap() throws IOException { + public void testDecoratorExclusion() throws IOException { + Configuration confCopy = new Configuration(flinkConfig); + confCopy.set(SecurityOptions.KERBEROS_LOGIN_KEYTAB, "missing.file"); + confCopy.set( + KubernetesConfigOptions.DECORATOR_EXCLUDE, + Collections.singletonList( + "org.apache.flink.kubernetes.kubeclient.decorators.KerberosMountDecorator")); + kubernetesJobManagerSpecification = + KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( + flinkPod, + new KubernetesJobManagerParameters( + confCopy, + new ClusterSpecification.ClusterSpecificationBuilder() + .setMasterMemoryMB(1024) + .setTaskManagerMemoryMB(1024) + .setSlotsPerTaskManager(3) + .createClusterSpecification())); + + boolean kerbSecretExists = + this.kubernetesJobManagerSpecification.getAccompanyingResources().stream() + .anyMatch( + x -> + x instanceof Secret + && x.getMetadata() + .getName() + .equals( + KerberosMountDecorator + .getKerberosKeytabSecretName( + CLUSTER_ID))); + assertThat(kerbSecretExists).isEqualTo(false); + } + + @Test + public void testFlinkConfConfigMap() throws IOException { kubernetesJobManagerSpecification = KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( flinkPod, kubernetesJobManagerParameters); diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactoryTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactoryTest.java index f49b8ad996f..3d1b898a72d 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactoryTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactoryTest.java @@ -18,16 +18,21 @@ package org.apache.flink.kubernetes.kubeclient.factory; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.kubernetes.KubernetesTestUtils; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.kubeclient.FlinkPod; import org.apache.flink.kubernetes.kubeclient.KubernetesTaskManagerTestBase; +import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters; import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.runtime.externalresource.ExternalResourceUtils; import io.fabric8.kubernetes.api.model.Container; import io.fabric8.kubernetes.api.model.Pod; import org.junit.jupiter.api.Test; +import java.util.Collections; import java.util.List; import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME; @@ -98,4 +103,30 @@ class KubernetesTaskManagerFactoryTest extends KubernetesTaskManagerTestBase { assertThat(resultMainContainer.getArgs()).hasSize(3); assertThat(resultMainContainer.getVolumeMounts()).hasSize(4); } + + @Test + public void testDecoratorExclusion() { + Configuration confCopy = new Configuration(flinkConfig); + confCopy.set(SecurityOptions.KERBEROS_LOGIN_KEYTAB, "missing.file"); + confCopy.set( + KubernetesConfigOptions.DECORATOR_EXCLUDE, + Collections.singletonList( + "org.apache.flink.kubernetes.kubeclient.decorators.KerberosMountDecorator")); + + KubernetesTaskManagerParameters parameters = + new KubernetesTaskManagerParameters( + confCopy, + POD_NAME, + DYNAMIC_PROPERTIES, + JVM_MEM_OPTS_ENV, + containeredTaskManagerParameters, + ExternalResourceUtils.getExternalResourceConfigurationKeys( + flinkConfig, + KubernetesConfigOptions + .EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX), + BLOCKED_NODES); + + KubernetesTaskManagerFactory.buildTaskManagerKubernetesPod( + new FlinkPod.Builder().build(), parameters); + } } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParametersTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParametersTest.java index 77dd53c7761..692fb1cd98e 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParametersTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParametersTest.java @@ -38,6 +38,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Random; +import java.util.Set; import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; import static org.assertj.core.api.Assertions.assertThat; @@ -193,5 +194,10 @@ public class AbstractKubernetesParametersTest { public List<Map<String, String>> getTolerations() { throw new UnsupportedOperationException("NOT supported"); } + + @Override + public Set<String> getExcludedDecoratorClasses() { + return Collections.emptySet(); + } } }