This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a4a99ba [FLINK-18149][k8s] Do not add
DeploymentOptionsInternal#CONF_DIR to config map
a4a99ba is described below
commit a4a99bac919d57387d54a2db80a249becf3ba680
Author: wangyang0918 <[email protected]>
AuthorDate: Fri Jun 5 16:33:37 2020 +0800
[FLINK-18149][k8s] Do not add DeploymentOptionsInternal#CONF_DIR to config
map
DeploymentOptionsInternal#CONF_DIR is an internal option and stores the
client config path. It should not be added to config map and used by JobManager
pod. Instead, KubernetesConfigOptions#FLINK_CONF_DIR will be used.
This closes #12501.
---
.../decorators/FlinkConfMountDecorator.java | 11 ++++-----
.../parameters/AbstractKubernetesParameters.java | 4 +++-
.../flink/kubernetes/KubernetesTestUtils.java | 15 +++++++++++++
.../decorators/FlinkConfMountDecoratorTest.java | 26 +++++++++++++++-------
.../AbstractKubernetesParametersTest.java | 16 +++++++++++++
5 files changed, 58 insertions(+), 14 deletions(-)
diff --git
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
index 79eb7aa..ef24fa3 100644
---
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
+++
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.kubeclient.decorators;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import
org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters;
@@ -144,11 +145,11 @@ public class FlinkConfMountDecorator extends
AbstractKubernetesStepDecorator {
* Get properties map for the cluster-side after removal of some keys.
*/
private Map<String, String> getClusterSidePropertiesMap(Configuration
flinkConfig) {
- final Map<String, String> propertiesMap = flinkConfig.toMap();
-
- // remove kubernetes.config.file
-
propertiesMap.remove(KubernetesConfigOptions.KUBE_CONFIG_FILE.key());
- return propertiesMap;
+ final Configuration clusterSideConfig = flinkConfig.clone();
+ // Remove some configuration options that should not be taken
to cluster side.
+
clusterSideConfig.removeConfig(KubernetesConfigOptions.KUBE_CONFIG_FILE);
+
clusterSideConfig.removeConfig(DeploymentOptionsInternal.CONF_DIR);
+ return clusterSideConfig.toMap();
}
@VisibleForTesting
diff --git
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
index c655b63..85a33d6 100644
---
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
+++
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
@@ -56,7 +56,9 @@ public abstract class AbstractKubernetesParameters implements
KubernetesParamete
@Override
public String getConfigDirectory() {
- final String configDir =
flinkConfig.get(DeploymentOptionsInternal.CONF_DIR);
+ final String configDir =
flinkConfig.getOptional(DeploymentOptionsInternal.CONF_DIR).orElse(
+
flinkConfig.getString(KubernetesConfigOptions.FLINK_CONF_DIR));
+
checkNotNull(configDir);
return configDir;
}
diff --git
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java
index f9dea84..6d1f8e7 100644
---
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java
+++
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java
@@ -18,8 +18,12 @@
package org.apache.flink.kubernetes;
+import org.apache.flink.configuration.Configuration;
+
import org.apache.flink.shaded.guava18.com.google.common.io.Files;
+import org.apache.commons.lang3.StringUtils;
+
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -32,4 +36,15 @@ public class KubernetesTestUtils {
public static void createTemporyFile(String data, File directory,
String fileName) throws IOException {
Files.write(data, new File(directory, fileName),
StandardCharsets.UTF_8);
}
+
+ public static Configuration loadConfigurationFromString(String content)
{
+ final Configuration configuration = new Configuration();
+ for (String line : content.split(System.lineSeparator())) {
+ final String[] splits = line.split(":");
+ if (splits.length >= 2) {
+ configuration.setString(splits[0].trim(),
StringUtils.substringAfter(line, ":").trim());
+ }
+ }
+ return configuration;
+ }
}
diff --git
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecoratorTest.java
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecoratorTest.java
index da5069f..05dc938 100644
---
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecoratorTest.java
+++
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecoratorTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.kubernetes.kubeclient.decorators;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.kubernetes.KubernetesTestUtils;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
@@ -42,9 +44,12 @@ import java.util.List;
import java.util.Map;
import static
org.apache.flink.configuration.GlobalConfiguration.FLINK_CONF_FILENAME;
+import static
org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator.getFlinkConfConfigMapName;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertThat;
/**
* General tests for the {@link FlinkConfMountDecorator}.
@@ -88,15 +93,20 @@ public class FlinkConfMountDecoratorTest extends
KubernetesJobManagerTestBase {
assertEquals(Constants.API_VERSION,
resultConfigMap.getApiVersion());
-
assertEquals(flinkConfMountDecorator.getFlinkConfConfigMapName(CLUSTER_ID),
+ assertEquals(getFlinkConfConfigMapName(CLUSTER_ID),
resultConfigMap.getMetadata().getName());
assertEquals(getCommonLabels(),
resultConfigMap.getMetadata().getLabels());
Map<String, String> resultDatas = resultConfigMap.getData();
assertEquals("some data", resultDatas.get("logback.xml"));
assertEquals("some data", resultDatas.get("log4j.properties"));
-
assertTrue(resultDatas.get(FLINK_CONF_FILENAME).contains(KubernetesConfigOptions.FLINK_CONF_DIR.key()
+
- ": " + FLINK_CONF_DIR_IN_POD));
+
+ final Configuration resultFlinkConfig =
KubernetesTestUtils.loadConfigurationFromString(
+ resultDatas.get(FLINK_CONF_FILENAME));
+
assertThat(resultFlinkConfig.get(KubernetesConfigOptions.FLINK_CONF_DIR),
is(FLINK_CONF_DIR_IN_POD));
+ // The following config options should not be added to config
map
+
assertThat(resultFlinkConfig.get(KubernetesConfigOptions.KUBE_CONFIG_FILE),
is(nullValue()));
+
assertThat(resultFlinkConfig.get(DeploymentOptionsInternal.CONF_DIR),
is(nullValue()));
}
@Test
@@ -112,7 +122,7 @@ public class FlinkConfMountDecoratorTest extends
KubernetesJobManagerTestBase {
new VolumeBuilder()
.withName(Constants.FLINK_CONF_VOLUME)
.withNewConfigMap()
-
.withName(flinkConfMountDecorator.getFlinkConfConfigMapName(CLUSTER_ID))
+
.withName(getFlinkConfConfigMapName(CLUSTER_ID))
.withItems(expectedKeyToPaths)
.endConfigMap()
.build());
@@ -145,7 +155,7 @@ public class FlinkConfMountDecoratorTest extends
KubernetesJobManagerTestBase {
new VolumeBuilder()
.withName(Constants.FLINK_CONF_VOLUME)
.withNewConfigMap()
-
.withName(flinkConfMountDecorator.getFlinkConfConfigMapName(CLUSTER_ID))
+ .withName(getFlinkConfConfigMapName(CLUSTER_ID))
.withItems(expectedKeyToPaths)
.endConfigMap()
.build());
@@ -171,7 +181,7 @@ public class FlinkConfMountDecoratorTest extends
KubernetesJobManagerTestBase {
new VolumeBuilder()
.withName(Constants.FLINK_CONF_VOLUME)
.withNewConfigMap()
-
.withName(flinkConfMountDecorator.getFlinkConfConfigMapName(CLUSTER_ID))
+ .withName(getFlinkConfConfigMapName(CLUSTER_ID))
.withItems(expectedKeyToPaths)
.endConfigMap()
.build());
@@ -202,7 +212,7 @@ public class FlinkConfMountDecoratorTest extends
KubernetesJobManagerTestBase {
new VolumeBuilder()
.withName(Constants.FLINK_CONF_VOLUME)
.withNewConfigMap()
-
.withName(flinkConfMountDecorator.getFlinkConfConfigMapName(CLUSTER_ID))
+ .withName(getFlinkConfConfigMapName(CLUSTER_ID))
.withItems(expectedKeyToPaths)
.endConfigMap()
.build());
diff --git
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParametersTest.java
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParametersTest.java
index 371b75a..3386aa2 100644
---
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParametersTest.java
+++
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParametersTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.kubernetes.kubeclient.parameters;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.util.StringUtils;
@@ -31,6 +32,8 @@ import java.util.Map;
import java.util.Random;
import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
/**
* General tests for the {@link AbstractKubernetesParameters}.
@@ -62,6 +65,19 @@ public class AbstractKubernetesParametersTest extends
TestLogger {
);
}
+ @Test
+ public void getConfigDirectory() {
+ final String confDir = "/path/of/flink-conf";
+ flinkConfig.set(DeploymentOptionsInternal.CONF_DIR, confDir);
+ assertThat(testingKubernetesParameters.getConfigDirectory(),
is(confDir));
+ }
+
+ @Test
+ public void getConfigDirectoryFallbackToPodConfDir() {
+ final String confDirInPod =
flinkConfig.get(KubernetesConfigOptions.FLINK_CONF_DIR);
+ assertThat(testingKubernetesParameters.getConfigDirectory(),
is(confDirInPod));
+ }
+
private class TestingKubernetesParameters extends
AbstractKubernetesParameters {
public TestingKubernetesParameters(Configuration flinkConfig) {