This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new d2d4dd0b [FLINK-31292] Introduce HadoopUtils to get Configuration in
CatalogContext
d2d4dd0b is described below
commit d2d4dd0bf8d206307eb2c93aa97e7c1cb750f187
Author: Yubin Li <[email protected]>
AuthorDate: Sat Mar 11 11:42:13 2023 +0800
[FLINK-31292] Introduce HadoopUtils to get Configuration in CatalogContext
This closes #592
---
.../flink/table/store/catalog/CatalogContext.java | 3 +-
.../flink/table/store/utils/HadoopUtils.java | 179 +++++++++++
.../table/store/fs/HadoopConfigLoadingTest.java | 331 +++++++++++++++++++++
.../store/file/catalog/CatalogFactoryTest.java | 19 ++
4 files changed, 531 insertions(+), 1 deletion(-)
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/catalog/CatalogContext.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/catalog/CatalogContext.java
index 466eac3c..35c5e143 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/catalog/CatalogContext.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/catalog/CatalogContext.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.store.fs.FileIOLoader;
import org.apache.flink.table.store.fs.Path;
import org.apache.flink.table.store.fs.hadoop.HadoopFileIOLoader;
import org.apache.flink.table.store.options.Options;
+import org.apache.flink.table.store.utils.HadoopUtils;
import org.apache.hadoop.conf.Configuration;
@@ -61,7 +62,7 @@ public class CatalogContext {
}
public static CatalogContext create(Options options) {
- return create(options, new Configuration());
+ return create(options, HadoopUtils.getHadoopConfiguration(options));
}
public static CatalogContext create(Options options, Configuration
hadoopConf) {
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/HadoopUtils.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/HadoopUtils.java
new file mode 100644
index 00000000..04130785
--- /dev/null
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/HadoopUtils.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.utils;
+
+import org.apache.flink.table.store.options.Options;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+/**
+ * Utility class for working with Hadoop-related classes. This should only be
used if Hadoop is on
+ * the classpath. Note: decoupled from specific engines.
+ */
+public class HadoopUtils {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HadoopUtils.class);
+ private static final String[] CONFIG_PREFIXES = {"hadoop."};
+ public static final String HADOOP_HOME_ENV = "HADOOP_HOME";
+ public static final String HADOOP_CONF_ENV = "HADOOP_CONF_DIR";
+
+ /**
+ * Path to hdfs-default.xml file.
+ *
+ * @deprecated Use environment variable HADOOP_CONF_DIR instead.
+ */
+ @Deprecated public static final String HDFS_DEFAULT_CONFIG =
"fs.hdfs.hdfsdefault";
+
+ /**
+ * Path to hdfs-site.xml file.
+ *
+ * @deprecated Use environment variable HADOOP_CONF_DIR instead.
+ */
+ @Deprecated public static final String HDFS_SITE_CONFIG =
"fs.hdfs.hdfssite";
+
+ /**
+ * Path to Hadoop configuration.
+ *
+ * @deprecated Use environment variable HADOOP_CONF_DIR instead.
+ */
+ @Deprecated public static final String PATH_HADOOP_CONFIG =
"fs.hdfs.hadoopconf";
+
+ public static Configuration getHadoopConfiguration(Options options) {
+
+ // Instantiate an HdfsConfiguration to load the hdfs-site.xml and
hdfs-default.xml
+ // from the classpath
+
+ Configuration result = new HdfsConfiguration();
+ boolean foundHadoopConfiguration = false;
+
+ // We need to load both core-site.xml and hdfs-site.xml to determine
the default fs path and
+ // the hdfs configuration.
+ // The properties of a newly added resource will override the ones in
previous resources, so
+ // a configuration
+ // file with higher priority should be added later.
+
+ // Approach 1: HADOOP_HOME environment variables
+ String[] possibleHadoopConfPaths = new String[2];
+
+ final String hadoopHomeDir = System.getenv(HADOOP_HOME_ENV);
+ if (hadoopHomeDir != null) {
+ LOG.debug("Searching Hadoop configuration files in HADOOP_HOME:
{}", hadoopHomeDir);
+ possibleHadoopConfPaths[0] = hadoopHomeDir + "/conf";
+ possibleHadoopConfPaths[1] = hadoopHomeDir + "/etc/hadoop"; //
hadoop 2.2
+ }
+
+ for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
+ if (possibleHadoopConfPath != null) {
+ foundHadoopConfiguration = addHadoopConfIfFound(result,
possibleHadoopConfPath);
+ }
+ }
+
+ // Approach 2: Flink configuration (deprecated)
+ final String hdfsDefaultPath = options.getString(HDFS_DEFAULT_CONFIG,
null);
+ if (hdfsDefaultPath != null) {
+ result.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
+ LOG.debug(
+ "Using hdfs-default configuration-file path from Flink
config: {}",
+ hdfsDefaultPath);
+ foundHadoopConfiguration = true;
+ }
+
+ final String hdfsSitePath = options.getString(HDFS_SITE_CONFIG, null);
+ if (hdfsSitePath != null) {
+ result.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
+ LOG.debug(
+ "Using hdfs-site configuration-file path from Flink
config: {}", hdfsSitePath);
+ foundHadoopConfiguration = true;
+ }
+
+ final String hadoopConfigPath = options.getString(PATH_HADOOP_CONFIG,
null);
+ if (hadoopConfigPath != null) {
+ LOG.debug("Searching Hadoop configuration files in Flink config:
{}", hadoopConfigPath);
+ foundHadoopConfiguration =
+ addHadoopConfIfFound(result, hadoopConfigPath) ||
foundHadoopConfiguration;
+ }
+
+ // Approach 3: HADOOP_CONF_DIR environment variable
+ String hadoopConfDir = System.getenv(HADOOP_CONF_ENV);
+ if (hadoopConfDir != null) {
+ LOG.debug("Searching Hadoop configuration files in
HADOOP_CONF_DIR: {}", hadoopConfDir);
+ foundHadoopConfiguration =
+ addHadoopConfIfFound(result, hadoopConfDir) ||
foundHadoopConfiguration;
+ }
+
+ // Approach 4: Flink configuration
+ // add all configuration key with prefix 'hadoop.' in flink conf to
hadoop conf
+ for (String key : options.keySet()) {
+ for (String prefix : CONFIG_PREFIXES) {
+ if (key.startsWith(prefix)) {
+ String newKey = key.substring(prefix.length());
+ String value = options.getString(key, null);
+ result.set(newKey, value);
+ LOG.debug(
+ "Adding Flink config entry for {} as {}={} to
Hadoop config",
+ key,
+ newKey,
+ value);
+ foundHadoopConfiguration = true;
+ }
+ }
+ }
+
+ if (!foundHadoopConfiguration) {
+ LOG.warn("Could not find Hadoop configuration via any of the
supported methods");
+ }
+
+ return result;
+ }
+
+ /**
+ * Search Hadoop configuration files in the given path, and add them to
the configuration if
+ * found.
+ */
+ private static boolean addHadoopConfIfFound(
+ Configuration configuration, String possibleHadoopConfPath) {
+ boolean foundHadoopConfiguration = false;
+ if (new File(possibleHadoopConfPath).exists()) {
+ if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) {
+ configuration.addResource(
+ new org.apache.hadoop.fs.Path(possibleHadoopConfPath +
"/core-site.xml"));
+ LOG.debug(
+ "Adding "
+ + possibleHadoopConfPath
+ + "/core-site.xml to hadoop configuration");
+ foundHadoopConfiguration = true;
+ }
+ if (new File(possibleHadoopConfPath + "/hdfs-site.xml").exists()) {
+ configuration.addResource(
+ new org.apache.hadoop.fs.Path(possibleHadoopConfPath +
"/hdfs-site.xml"));
+ LOG.debug(
+ "Adding "
+ + possibleHadoopConfPath
+ + "/hdfs-site.xml to hadoop configuration");
+ foundHadoopConfiguration = true;
+ }
+ }
+ return foundHadoopConfiguration;
+ }
+}
diff --git
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/fs/HadoopConfigLoadingTest.java
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/fs/HadoopConfigLoadingTest.java
new file mode 100644
index 00000000..23a4b604
--- /dev/null
+++
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/fs/HadoopConfigLoadingTest.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.fs;
+
+import org.apache.flink.table.store.options.Options;
+import org.apache.flink.table.store.utils.CommonTestUtils;
+import org.apache.flink.table.store.utils.HadoopUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests that validate the loading of the Hadoop configuration, relative to
entries in the Flink
+ * configuration and the environment variables.
+ */
+@SuppressWarnings("deprecation")
+public class HadoopConfigLoadingTest {
+
+ private static final String IN_CP_CONFIG_KEY = "cp_conf_key";
+ private static final String IN_CP_CONFIG_VALUE = "oompf!";
+
+ private @TempDir Path tempDir;
+
+ @Test
+ public void loadFromClasspathByDefault() {
+ Configuration hadoopConf = HadoopUtils.getHadoopConfiguration(new
Options());
+
+ assertEquals(IN_CP_CONFIG_VALUE, hadoopConf.get(IN_CP_CONFIG_KEY,
null));
+ }
+
+ @Test
+ public void loadFromLegacyConfigEntries() throws Exception {
+ final String k1 = "k1 prop";
+ final String v1 = "value1";
+
+ final String k2 = "k2 prop";
+ final String v2 = "value2";
+
+ final File file1 = tempDir.resolve("core-site.xml").toFile();
+ final File file2 = tempDir.resolve("hdfs-site.xml").toFile();
+
+ printConfig(file1, k1, v1);
+ printConfig(file2, k2, v2);
+
+ final Options options = new Options();
+ options.set(HadoopUtils.HDFS_DEFAULT_CONFIG, file1.getAbsolutePath());
+ options.set(HadoopUtils.HDFS_SITE_CONFIG, file2.getAbsolutePath());
+
+ Configuration hadoopConf = HadoopUtils.getHadoopConfiguration(options);
+
+ // contains extra entries
+ assertEquals(v1, hadoopConf.get(k1, null));
+ assertEquals(v2, hadoopConf.get(k2, null));
+
+ // also contains classpath defaults
+ assertEquals(IN_CP_CONFIG_VALUE, hadoopConf.get(IN_CP_CONFIG_KEY,
null));
+ }
+
+ @Test
+ public void loadFromHadoopConfEntry() throws Exception {
+ final String k1 = "singing?";
+ final String v1 = "rain!";
+
+ final String k2 = "dancing?";
+ final String v2 = "shower!";
+
+ final File confDir = tempDir.toFile();
+
+ final File file1 = new File(confDir, "core-site.xml");
+ final File file2 = new File(confDir, "hdfs-site.xml");
+
+ printConfig(file1, k1, v1);
+ printConfig(file2, k2, v2);
+
+ final Options options = new Options();
+ options.setString(HadoopUtils.PATH_HADOOP_CONFIG,
confDir.getAbsolutePath());
+
+ Configuration hadoopConf = HadoopUtils.getHadoopConfiguration(options);
+
+ // contains extra entries
+ assertEquals(v1, hadoopConf.get(k1, null));
+ assertEquals(v2, hadoopConf.get(k2, null));
+
+ // also contains classpath defaults
+ assertEquals(IN_CP_CONFIG_VALUE, hadoopConf.get(IN_CP_CONFIG_KEY,
null));
+ }
+
+ @Test
+ public void loadFromEnvVariables() throws Exception {
+ final String k1 = "where?";
+ final String v1 = "I'm on a boat";
+ final String k2 = "when?";
+ final String v2 = "midnight";
+ final String k3 = "why?";
+ final String v3 = "what do you think?";
+ final String k4 = "which way?";
+ final String v4 = "south, always south...";
+ final String k5 = "how long?";
+ final String v5 = "an eternity";
+ final String k6 = "for real?";
+ final String v6 = "quite so...";
+
+ final File hadoopConfDir = tempDir.resolve("hadoop_conf").toFile();
+
+ final File hadoopHome = tempDir.resolve("hadoop_home").toFile();
+
+ final File hadoopHomeConf = new File(hadoopHome, "conf");
+ final File hadoopHomeEtc = new File(hadoopHome, "etc/hadoop");
+
+ assertTrue(hadoopConfDir.mkdirs());
+ assertTrue(hadoopHomeConf.mkdirs());
+ assertTrue(hadoopHomeEtc.mkdirs());
+
+ final File file1 = new File(hadoopConfDir, "core-site.xml");
+ final File file2 = new File(hadoopConfDir, "hdfs-site.xml");
+ final File file3 = new File(hadoopHomeConf, "core-site.xml");
+ final File file4 = new File(hadoopHomeConf, "hdfs-site.xml");
+ final File file5 = new File(hadoopHomeEtc, "core-site.xml");
+ final File file6 = new File(hadoopHomeEtc, "hdfs-site.xml");
+
+ printConfig(file1, k1, v1);
+ printConfig(file2, k2, v2);
+ printConfig(file3, k3, v3);
+ printConfig(file4, k4, v4);
+ printConfig(file5, k5, v5);
+ printConfig(file6, k6, v6);
+
+ final Configuration hadoopConf;
+
+ final Map<String, String> originalEnv = System.getenv();
+ final Map<String, String> newEnv = new HashMap<>(originalEnv);
+ newEnv.put(HadoopUtils.HADOOP_CONF_ENV,
hadoopConfDir.getAbsolutePath());
+ newEnv.put(HadoopUtils.HADOOP_HOME_ENV, hadoopHome.getAbsolutePath());
+ try {
+ CommonTestUtils.setEnv(newEnv);
+ hadoopConf = HadoopUtils.getHadoopConfiguration(new Options());
+ } finally {
+ CommonTestUtils.setEnv(originalEnv);
+ }
+
+ // contains extra entries
+ assertEquals(v1, hadoopConf.get(k1, null));
+ assertEquals(v2, hadoopConf.get(k2, null));
+ assertEquals(v3, hadoopConf.get(k3, null));
+ assertEquals(v4, hadoopConf.get(k4, null));
+ assertEquals(v5, hadoopConf.get(k5, null));
+ assertEquals(v6, hadoopConf.get(k6, null));
+
+ // also contains classpath defaults
+ assertEquals(IN_CP_CONFIG_VALUE, hadoopConf.get(IN_CP_CONFIG_KEY,
null));
+ }
+
+ @Test
+ public void loadOverlappingConfig() throws Exception {
+ final String k1 = "key1";
+ final String k2 = "key2";
+ final String k3 = "key3";
+ final String k4 = "key4";
+ final String k5 = "key5";
+
+ final String v1 = "from HADOOP_CONF_DIR";
+ final String v2 = "from Flink config `fs.hdfs.hadoopconf`";
+ final String v3 = "from Flink config `fs.hdfs.hdfsdefault`";
+ final String v4 = "from HADOOP_HOME/etc/hadoop";
+ final String v5 = "from HADOOP_HOME/conf";
+
+ final File hadoopConfDir = tempDir.resolve("hadoopConfDir").toFile();
+ final File hadoopConfEntryDir =
tempDir.resolve("hadoopConfEntryDir").toFile();
+ final File legacyConfDir = tempDir.resolve("legacyConfDir").toFile();
+ final File hadoopHome = tempDir.resolve("hadoopHome").toFile();
+
+ final File hadoopHomeConf =
tempDir.resolve("hadoopHome/conf").toFile();
+ final File hadoopHomeEtc =
tempDir.resolve("hadoopHome/etc/hadoop").toFile();
+
+ assertTrue(hadoopConfDir.mkdirs());
+ assertTrue(hadoopConfEntryDir.mkdirs());
+ assertTrue(legacyConfDir.mkdirs());
+ assertTrue(hadoopHomeConf.mkdirs());
+ assertTrue(hadoopHomeEtc.mkdirs());
+
+ final File file1 = new File(hadoopConfDir, "core-site.xml");
+ final File file2 = new File(hadoopConfEntryDir, "core-site.xml");
+ final File file3 = new File(legacyConfDir, "core-site.xml");
+ final File file4 = new File(hadoopHomeEtc, "core-site.xml");
+ final File file5 = new File(hadoopHomeConf, "core-site.xml");
+
+ printConfig(file1, k1, v1);
+
+ Map<String, String> properties2 = new HashMap<>();
+ properties2.put(k1, v2);
+ properties2.put(k2, v2);
+ printConfigs(file2, properties2);
+
+ Map<String, String> properties3 = new HashMap<>();
+ properties3.put(k1, v3);
+ properties3.put(k2, v3);
+ properties3.put(k3, v3);
+ printConfigs(file3, properties3);
+
+ Map<String, String> properties4 = new HashMap<>();
+ properties4.put(k1, v4);
+ properties4.put(k2, v4);
+ properties4.put(k3, v4);
+ properties4.put(k4, v4);
+ printConfigs(file4, properties4);
+
+ Map<String, String> properties5 = new HashMap<>();
+ properties5.put(k1, v5);
+ properties5.put(k2, v5);
+ properties5.put(k3, v5);
+ properties5.put(k4, v5);
+ properties5.put(k5, v5);
+ printConfigs(file5, properties5);
+
+ final Options options = new Options();
+ options.setString(HadoopUtils.PATH_HADOOP_CONFIG,
hadoopConfEntryDir.getAbsolutePath());
+ options.setString(HadoopUtils.HDFS_DEFAULT_CONFIG,
file3.getAbsolutePath());
+
+ final Configuration hadoopConf;
+
+ final Map<String, String> originalEnv = System.getenv();
+ final Map<String, String> newEnv = new HashMap<>(originalEnv);
+ newEnv.put(HadoopUtils.HADOOP_CONF_ENV,
hadoopConfDir.getAbsolutePath());
+ newEnv.put(HadoopUtils.HADOOP_HOME_ENV, hadoopHome.getAbsolutePath());
+ try {
+ CommonTestUtils.setEnv(newEnv);
+ hadoopConf = HadoopUtils.getHadoopConfiguration(options);
+ } finally {
+ CommonTestUtils.setEnv(originalEnv);
+ }
+
+ // contains extra entries
+ assertEquals(v1, hadoopConf.get(k1, null));
+ assertEquals(v2, hadoopConf.get(k2, null));
+ assertEquals(v3, hadoopConf.get(k3, null));
+ assertEquals(v4, hadoopConf.get(k4, null));
+ assertEquals(v5, hadoopConf.get(k5, null));
+
+ // also contains classpath defaults
+ assertEquals(IN_CP_CONFIG_VALUE, hadoopConf.get(IN_CP_CONFIG_KEY,
null));
+ }
+
+ @Test
+ public void loadFromFlinkConfEntry() throws Exception {
+ final String prefix = "hadoop.";
+
+ final String k1 = "brooklyn";
+ final String v1 = "nets";
+
+ final String k2 = "miami";
+ final String v2 = "heat";
+
+ final String k3 = "philadelphia";
+ final String v3 = "76ers";
+
+ final String k4 = "golden.state";
+ final String v4 = "warriors";
+
+ final String k5 = "oklahoma.city";
+ final String v5 = "thunders";
+
+ final Options options = new Options();
+ options.setString(prefix + k1, v1);
+ options.setString(prefix + k2, v2);
+ options.setString(prefix + k3, v3);
+ options.setString(prefix + k4, v4);
+ options.setString(k5, v5);
+
+ Configuration hadoopConf = HadoopUtils.getHadoopConfiguration(options);
+
+ // contains extra entries
+ assertEquals(v1, hadoopConf.get(k1, null));
+ assertEquals(v2, hadoopConf.get(k2, null));
+ assertEquals(v3, hadoopConf.get(k3, null));
+ assertEquals(v4, hadoopConf.get(k4, null));
+ assertNull(hadoopConf.get(k5));
+
+ // also contains classpath defaults
+ assertEquals(IN_CP_CONFIG_VALUE, hadoopConf.get(IN_CP_CONFIG_KEY,
null));
+ }
+
+ private static void printConfig(File file, String key, String value)
throws IOException {
+ Map<String, String> map = new HashMap<>(1);
+ map.put(key, value);
+ printConfigs(file, map);
+ }
+
+ private static void printConfigs(File file, Map<String, String>
properties) throws IOException {
+ try (PrintStream out = new
PrintStream(Files.newOutputStream(file.toPath()))) {
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<?xml-stylesheet type=\"text/xsl\"
href=\"configuration.xsl\"?>");
+ out.println("<configuration>");
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ out.println("\t<property>");
+ out.println("\t\t<name>" + entry.getKey() + "</name>");
+ out.println("\t\t<value>" + entry.getValue() + "</value>");
+ out.println("\t</property>");
+ }
+ out.println("</configuration>");
+ }
+ }
+}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/CatalogFactoryTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/CatalogFactoryTest.java
index f5a777ea..23b78ea4 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/CatalogFactoryTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/CatalogFactoryTest.java
@@ -24,6 +24,8 @@ import org.apache.flink.table.store.fs.local.LocalFileIO;
import org.apache.flink.table.store.options.Options;
import org.apache.flink.table.store.table.TableType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -66,4 +68,21 @@ public class CatalogFactoryTest {
assertThatThrownBy(() ->
CatalogFactory.createCatalog(CatalogContext.create(options)))
.hasMessageContaining("Only managed table is supported in File
system catalog.");
}
+
+ @Test
+ public void testContextDefaultHadoopConf(@TempDir java.nio.file.Path path)
{
+ Path root = new Path(path.toUri().toString());
+ String defaultFS = "master:9999";
+ String replication = "8";
+
+ Options options = new Options();
+ options.set(WAREHOUSE, new Path(root, "warehouse").toString());
+ options.set("hadoop.fs.defaultFS", defaultFS);
+ options.set("hadoop.dfs.replication", replication);
+ Configuration conf = CatalogContext.create(options).hadoopConf();
+
+ assertThat(conf).isInstanceOf(HdfsConfiguration.class);
+ assertThat(conf.get("fs.defaultFS")).isEqualTo(defaultFS);
+ assertThat(conf.get("dfs.replication")).isEqualTo(replication);
+ }
}