This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a4a99ba  [FLINK-18149][k8s] Do not add 
DeploymentOptionsInternal#CONF_DIR to config map
a4a99ba is described below

commit a4a99bac919d57387d54a2db80a249becf3ba680
Author: wangyang0918 <[email protected]>
AuthorDate: Fri Jun 5 16:33:37 2020 +0800

    [FLINK-18149][k8s] Do not add DeploymentOptionsInternal#CONF_DIR to config 
map
    
    DeploymentOptionsInternal#CONF_DIR is an internal option and stores the 
client config path. It should not be added to config map and used by JobManager 
pod. Instead, KubernetesConfigOptions#FLINK_CONF_DIR will be used.
    
    This closes #12501.
---
 .../decorators/FlinkConfMountDecorator.java        | 11 ++++-----
 .../parameters/AbstractKubernetesParameters.java   |  4 +++-
 .../flink/kubernetes/KubernetesTestUtils.java      | 15 +++++++++++++
 .../decorators/FlinkConfMountDecoratorTest.java    | 26 +++++++++++++++-------
 .../AbstractKubernetesParametersTest.java          | 16 +++++++++++++
 5 files changed, 58 insertions(+), 14 deletions(-)

diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
index 79eb7aa..ef24fa3 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.kubeclient.decorators;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.kubeclient.FlinkPod;
 import 
org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters;
@@ -144,11 +145,11 @@ public class FlinkConfMountDecorator extends 
AbstractKubernetesStepDecorator {
         * Get properties map for the cluster-side after removal of some keys.
         */
        private Map<String, String> getClusterSidePropertiesMap(Configuration 
flinkConfig) {
-               final Map<String, String> propertiesMap = flinkConfig.toMap();
-
-               // remove kubernetes.config.file
-               
propertiesMap.remove(KubernetesConfigOptions.KUBE_CONFIG_FILE.key());
-               return propertiesMap;
+               final Configuration clusterSideConfig = flinkConfig.clone();
+               // Remove some configuration options that should not be taken 
to cluster side.
+               
clusterSideConfig.removeConfig(KubernetesConfigOptions.KUBE_CONFIG_FILE);
+               
clusterSideConfig.removeConfig(DeploymentOptionsInternal.CONF_DIR);
+               return clusterSideConfig.toMap();
        }
 
        @VisibleForTesting
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
index c655b63..85a33d6 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
@@ -56,7 +56,9 @@ public abstract class AbstractKubernetesParameters implements 
KubernetesParamete
 
        @Override
        public String getConfigDirectory() {
-               final String configDir = 
flinkConfig.get(DeploymentOptionsInternal.CONF_DIR);
+               final String configDir = 
flinkConfig.getOptional(DeploymentOptionsInternal.CONF_DIR).orElse(
+                       
flinkConfig.getString(KubernetesConfigOptions.FLINK_CONF_DIR));
+
                checkNotNull(configDir);
                return configDir;
        }
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java
index f9dea84..6d1f8e7 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java
@@ -18,8 +18,12 @@
 
 package org.apache.flink.kubernetes;
 
+import org.apache.flink.configuration.Configuration;
+
 import org.apache.flink.shaded.guava18.com.google.common.io.Files;
 
+import org.apache.commons.lang3.StringUtils;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -32,4 +36,15 @@ public class KubernetesTestUtils {
        public static void createTemporyFile(String data, File directory, 
String fileName) throws IOException {
                Files.write(data, new File(directory, fileName), 
StandardCharsets.UTF_8);
        }
+
+       public static Configuration loadConfigurationFromString(String content) 
{
+               final Configuration configuration = new Configuration();
+               for (String line : content.split(System.lineSeparator())) {
+                       final String[] splits = line.split(":");
+                       if (splits.length >= 2) {
+                               configuration.setString(splits[0].trim(), 
StringUtils.substringAfter(line, ":").trim());
+                       }
+               }
+               return configuration;
+       }
 }
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecoratorTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecoratorTest.java
index da5069f..05dc938 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecoratorTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecoratorTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.kubernetes.kubeclient.decorators;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
 import org.apache.flink.kubernetes.KubernetesTestUtils;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.kubeclient.FlinkPod;
@@ -42,9 +44,12 @@ import java.util.List;
 import java.util.Map;
 
 import static 
org.apache.flink.configuration.GlobalConfiguration.FLINK_CONF_FILENAME;
+import static 
org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator.getFlinkConfConfigMapName;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertThat;
 
 /**
  * General tests for the {@link FlinkConfMountDecorator}.
@@ -88,15 +93,20 @@ public class FlinkConfMountDecoratorTest extends 
KubernetesJobManagerTestBase {
 
                assertEquals(Constants.API_VERSION, 
resultConfigMap.getApiVersion());
 
-               
assertEquals(flinkConfMountDecorator.getFlinkConfConfigMapName(CLUSTER_ID),
+               assertEquals(getFlinkConfConfigMapName(CLUSTER_ID),
                                resultConfigMap.getMetadata().getName());
                assertEquals(getCommonLabels(), 
resultConfigMap.getMetadata().getLabels());
 
                Map<String, String> resultDatas = resultConfigMap.getData();
                assertEquals("some data", resultDatas.get("logback.xml"));
                assertEquals("some data", resultDatas.get("log4j.properties"));
-               
assertTrue(resultDatas.get(FLINK_CONF_FILENAME).contains(KubernetesConfigOptions.FLINK_CONF_DIR.key()
 +
-                               ": " + FLINK_CONF_DIR_IN_POD));
+
+               final Configuration resultFlinkConfig = 
KubernetesTestUtils.loadConfigurationFromString(
+                       resultDatas.get(FLINK_CONF_FILENAME));
+               
assertThat(resultFlinkConfig.get(KubernetesConfigOptions.FLINK_CONF_DIR), 
is(FLINK_CONF_DIR_IN_POD));
+               // The following config options should not be added to config 
map
+               
assertThat(resultFlinkConfig.get(KubernetesConfigOptions.KUBE_CONFIG_FILE), 
is(nullValue()));
+               
assertThat(resultFlinkConfig.get(DeploymentOptionsInternal.CONF_DIR), 
is(nullValue()));
        }
 
        @Test
@@ -112,7 +122,7 @@ public class FlinkConfMountDecoratorTest extends 
KubernetesJobManagerTestBase {
                        new VolumeBuilder()
                                .withName(Constants.FLINK_CONF_VOLUME)
                                .withNewConfigMap()
-                                       
.withName(flinkConfMountDecorator.getFlinkConfConfigMapName(CLUSTER_ID))
+                                       
.withName(getFlinkConfConfigMapName(CLUSTER_ID))
                                        .withItems(expectedKeyToPaths)
                                        .endConfigMap()
                                .build());
@@ -145,7 +155,7 @@ public class FlinkConfMountDecoratorTest extends 
KubernetesJobManagerTestBase {
                        new VolumeBuilder()
                                .withName(Constants.FLINK_CONF_VOLUME)
                                .withNewConfigMap()
-                               
.withName(flinkConfMountDecorator.getFlinkConfConfigMapName(CLUSTER_ID))
+                               .withName(getFlinkConfConfigMapName(CLUSTER_ID))
                                .withItems(expectedKeyToPaths)
                                .endConfigMap()
                                .build());
@@ -171,7 +181,7 @@ public class FlinkConfMountDecoratorTest extends 
KubernetesJobManagerTestBase {
                        new VolumeBuilder()
                                .withName(Constants.FLINK_CONF_VOLUME)
                                .withNewConfigMap()
-                               
.withName(flinkConfMountDecorator.getFlinkConfConfigMapName(CLUSTER_ID))
+                               .withName(getFlinkConfConfigMapName(CLUSTER_ID))
                                .withItems(expectedKeyToPaths)
                                .endConfigMap()
                                .build());
@@ -202,7 +212,7 @@ public class FlinkConfMountDecoratorTest extends 
KubernetesJobManagerTestBase {
                        new VolumeBuilder()
                                .withName(Constants.FLINK_CONF_VOLUME)
                                .withNewConfigMap()
-                               
.withName(flinkConfMountDecorator.getFlinkConfConfigMapName(CLUSTER_ID))
+                               .withName(getFlinkConfConfigMapName(CLUSTER_ID))
                                .withItems(expectedKeyToPaths)
                                .endConfigMap()
                                .build());
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParametersTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParametersTest.java
index 371b75a..3386aa2 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParametersTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParametersTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.kubernetes.kubeclient.parameters;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.util.StringUtils;
@@ -31,6 +32,8 @@ import java.util.Map;
 import java.util.Random;
 
 import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
 
 /**
  * General tests for the {@link AbstractKubernetesParameters}.
@@ -62,6 +65,19 @@ public class AbstractKubernetesParametersTest extends 
TestLogger {
                );
        }
 
+       @Test
+       public void getConfigDirectory() {
+               final String confDir = "/path/of/flink-conf";
+               flinkConfig.set(DeploymentOptionsInternal.CONF_DIR, confDir);
+               assertThat(testingKubernetesParameters.getConfigDirectory(), 
is(confDir));
+       }
+
+       @Test
+       public void getConfigDirectoryFallbackToPodConfDir() {
+               final String confDirInPod = 
flinkConfig.get(KubernetesConfigOptions.FLINK_CONF_DIR);
+               assertThat(testingKubernetesParameters.getConfigDirectory(), 
is(confDirInPod));
+       }
+
        private class TestingKubernetesParameters extends 
AbstractKubernetesParameters {
 
                public TestingKubernetesParameters(Configuration flinkConfig) {

Reply via email to