This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ed98b259d [flink] Optimize Hadoop configuration loading (#4241)
ed98b259d is described below
commit ed98b259da632df8407d99651a7de55e4e4931a3
Author: zhuanshenbsj1 <[email protected]>
AuthorDate: Tue Sep 24 17:55:52 2024 +0800
[flink] Optimize Hadoop configuration loading (#4241)
---
.../java/org/apache/paimon/utils/HadoopUtils.java | 47 +++++++----
.../apache/paimon/fs/HadoopConfigLoadingTest.java | 96 ++++++++++------------
.../java/org/apache/paimon/hive/HiveCatalog.java | 44 +---------
3 files changed, 77 insertions(+), 110 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/HadoopUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/HadoopUtils.java
index d931b63d5..be25fce4d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/HadoopUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/HadoopUtils.java
@@ -90,11 +90,16 @@ public class HadoopUtils {
// a configuration
// file with higher priority should be added later.
- // Approach 1: HADOOP_HOME environment variables
- String[] possibleHadoopConfPaths = new String[2];
-
HadoopConfigLoader loader = options.get(HADOOP_CONF_LOADER);
+ // The HDFS configuration priority from low to high is as follows:
+ // 1. HADOOP_HOME
+ // 2. HADOOP_CONF_DIR
+ // 3. Paimon catalog or paimon table configuration(hadoop-conf-dir)
+ // 4. paimon table advanced configurations
+
+ // Approach 1: HADOOP_HOME environment variables
+ String[] possibleHadoopConfPaths = new String[2];
final String hadoopHomeDir = System.getenv(HADOOP_HOME_ENV);
if (hadoopHomeDir != null && loader.loadEnv()) {
LOG.debug("Searching Hadoop configuration files in HADOOP_HOME:
{}", hadoopHomeDir);
@@ -109,26 +114,25 @@ public class HadoopUtils {
}
}
- // Approach 2: Paimon Catalog Option
- final String hadoopConfigPath = options.getString(PATH_HADOOP_CONFIG,
null);
- if (!StringUtils.isNullOrWhitespaceOnly(hadoopConfigPath) &&
loader.loadOption()) {
- LOG.debug(
- "Searching Hadoop configuration files in Paimon config:
{}", hadoopConfigPath);
+ // Approach 2: HADOOP_CONF_DIR environment variable
+ String hadoopConfDir = System.getenv(HADOOP_CONF_ENV);
+ if (!StringUtils.isNullOrWhitespaceOnly(hadoopConfDir) &&
loader.loadEnv()) {
+ LOG.debug("Searching Hadoop configuration files in
HADOOP_CONF_DIR: {}", hadoopConfDir);
foundHadoopConfiguration =
- addHadoopConfIfFound(result, hadoopConfigPath, options)
+ addHadoopConfIfFound(result, hadoopConfDir, options)
|| foundHadoopConfiguration;
}
- // Approach 3: HADOOP_CONF_DIR environment variable
- String hadoopConfDir = System.getenv(HADOOP_CONF_ENV);
- if (!StringUtils.isNullOrWhitespaceOnly(hadoopConfDir) &&
loader.loadEnv()) {
- LOG.debug("Searching Hadoop configuration files in
HADOOP_CONF_DIR: {}", hadoopConfDir);
+ // Approach 3: Paimon table or paimon catalog hadoop conf
+ hadoopConfDir = options.getString(PATH_HADOOP_CONFIG, null);
+ if (!StringUtils.isNullOrWhitespaceOnly(hadoopConfDir) &&
loader.loadOption()) {
+ LOG.debug("Searching Hadoop configuration files in Paimon config:
{}", hadoopConfDir);
foundHadoopConfiguration =
addHadoopConfIfFound(result, hadoopConfDir, options)
|| foundHadoopConfiguration;
}
- // Approach 4: Paimon configuration
+ // Approach 4: Paimon advanced configuration
// add all configuration key with prefix 'hadoop.' in Paimon conf to
hadoop conf
for (String key : options.keySet()) {
for (String prefix : CONFIG_PREFIXES) {
@@ -157,7 +161,7 @@ public class HadoopUtils {
* Search Hadoop configuration files in the given path, and add them to
the configuration if
* found.
*/
- private static boolean addHadoopConfIfFound(
+ public static boolean addHadoopConfIfFound(
Configuration configuration, String possibleHadoopConfPath,
Options options) {
Path root = new Path(possibleHadoopConfPath);
@@ -185,6 +189,17 @@ public class HadoopUtils {
+ "/hdfs-site.xml to hadoop
configuration");
foundHadoopConfiguration = true;
}
+
+ // Add mapred-site.xml. We need to read configurations like
compression codec.
+ path = new Path(possibleHadoopConfPath, "mapred-site.xml");
+ if (fileIO.exists(path)) {
+ readHadoopXml(fileIO.readFileUtf8(path), configuration);
+ LOG.debug(
+ "Adding "
+ + possibleHadoopConfPath
+ + "/mapred-site.xml to hadoop
configuration");
+ foundHadoopConfiguration = true;
+ }
}
return foundHadoopConfiguration;
@@ -222,7 +237,7 @@ public class HadoopUtils {
public enum HadoopConfigLoader implements DescribedEnum {
ALL("all", "Load Hadoop conf from environment variables and catalog
option.", true, true),
ENV("env", "Load Hadoop conf from environment variables only.", true,
false),
- OPTION("option", "Load Hadoop conf from catalog option only.", false,
true);
+ OPTION("option", "Load Hadoop conf from catalog or table option
only.", false, true);
private final String value;
private final String description;
diff --git
a/paimon-common/src/test/java/org/apache/paimon/fs/HadoopConfigLoadingTest.java
b/paimon-common/src/test/java/org/apache/paimon/fs/HadoopConfigLoadingTest.java
index d4a20e260..9ba94b5b0 100644
---
a/paimon-common/src/test/java/org/apache/paimon/fs/HadoopConfigLoadingTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/fs/HadoopConfigLoadingTest.java
@@ -151,99 +151,91 @@ public class HadoopConfigLoadingTest {
@Test
public void loadOverlappingConfig() throws Exception {
- final String k1 = "key1";
- final String k2 = "key2";
- final String k3 = "key3";
- final String k4 = "key4";
- final String k5 = "key5";
+ final String k1 = "key";
- final String v1 = "from HADOOP_CONF_DIR";
- final String v2 = "from Paimon config `hadoop-conf-dir`";
- final String v4 = "from HADOOP_HOME/etc/hadoop";
- final String v5 = "from HADOOP_HOME/conf";
+ final String v1 = "from HADOOP_HOME/etc/hadoop or HADOOP_HOME/conf";
+ final String v2 = "from HADOOP_CONF_DIR";
+ final String v3 = "from Paimon config hadoop-conf-dir";
+ final String v4 = "from Paimon advanced configuration";
- final File hadoopConfDir = tempDir.resolve("hadoopConfDir").toFile();
- final File hadoopConfEntryDir =
tempDir.resolve("hadoopConfEntryDir").toFile();
- final File legacyConfDir = tempDir.resolve("legacyConfDir").toFile();
final File hadoopHome = tempDir.resolve("hadoopHome").toFile();
-
final File hadoopHomeConf =
tempDir.resolve("hadoopHome/conf").toFile();
final File hadoopHomeEtc =
tempDir.resolve("hadoopHome/etc/hadoop").toFile();
+ final File hadoopConfDir = tempDir.resolve("hadoopConfDir").toFile();
+ final File hadoopPaimonConfDir =
tempDir.resolve("hadoopPaimonConfDir").toFile();
- assertThat(hadoopConfDir.mkdirs()).isTrue();
- assertThat(hadoopConfEntryDir.mkdirs()).isTrue();
- assertThat(legacyConfDir.mkdirs()).isTrue();
assertThat(hadoopHomeConf.mkdirs()).isTrue();
assertThat(hadoopHomeEtc.mkdirs()).isTrue();
+ assertThat(hadoopConfDir.mkdirs()).isTrue();
+ assertThat(hadoopPaimonConfDir.mkdirs()).isTrue();
- final File file1 = new File(hadoopConfDir, "core-site.xml");
- final File file2 = new File(hadoopConfEntryDir, "core-site.xml");
- final File file4 = new File(hadoopHomeEtc, "core-site.xml");
- final File file5 = new File(hadoopHomeConf, "core-site.xml");
+ final File fileHomeEtc = new File(hadoopHomeEtc, "core-site.xml");
+ final File fileHomeConf = new File(hadoopHomeConf, "core-site.xml");
+ final File fileConfDir = new File(hadoopConfDir, "core-site.xml");
+ final File filePaimonConfDir = new File(hadoopPaimonConfDir,
"core-site.xml");
- printConfig(file1, k1, v1);
+ Map<String, String> properties1 = new HashMap<>();
+ properties1.put(k1, v1);
+ printConfigs(fileHomeEtc, properties1);
+ printConfigs(fileHomeConf, properties1);
Map<String, String> properties2 = new HashMap<>();
properties2.put(k1, v2);
- properties2.put(k2, v2);
- printConfigs(file2, properties2);
-
- Map<String, String> properties4 = new HashMap<>();
- properties4.put(k1, v4);
- properties4.put(k2, v4);
- properties4.put(k3, v4);
- properties4.put(k4, v4);
- printConfigs(file4, properties4);
-
- Map<String, String> properties5 = new HashMap<>();
- properties5.put(k1, v5);
- properties5.put(k2, v5);
- properties5.put(k3, v5);
- properties5.put(k4, v5);
- properties5.put(k5, v5);
- printConfigs(file5, properties5);
+ printConfigs(fileConfDir, properties2);
+
+ Map<String, String> properties3 = new HashMap<>();
+ properties3.put(k1, v3);
+ printConfigs(filePaimonConfDir, properties3);
final Options options = new Options();
- options.setString(HadoopUtils.PATH_HADOOP_CONFIG,
hadoopConfEntryDir.getAbsolutePath());
final Configuration hadoopConf1;
final Configuration hadoopConf2;
final Configuration hadoopConf3;
+ final Configuration hadoopConf4;
+ final Configuration hadoopConf5;
final Map<String, String> originalEnv = System.getenv();
final Map<String, String> newEnv = new HashMap<>(originalEnv);
- newEnv.put(HadoopUtils.HADOOP_CONF_ENV,
hadoopConfDir.getAbsolutePath());
newEnv.put(HadoopUtils.HADOOP_HOME_ENV, hadoopHome.getAbsolutePath());
+ final Map<String, String> newEnv1 = new HashMap<>(newEnv);
+ newEnv1.put(HadoopUtils.HADOOP_CONF_ENV,
hadoopConfDir.getAbsolutePath());
try {
+ options.set(HadoopUtils.HADOOP_CONF_LOADER,
HadoopUtils.HadoopConfigLoader.ENV);
CommonTestUtils.setEnv(newEnv);
hadoopConf1 = HadoopUtils.getHadoopConfiguration(options);
- options.set(HadoopUtils.HADOOP_CONF_LOADER,
HadoopUtils.HadoopConfigLoader.ENV);
+ CommonTestUtils.setEnv(newEnv1);
hadoopConf2 = HadoopUtils.getHadoopConfiguration(options);
options.set(HadoopUtils.HADOOP_CONF_LOADER,
HadoopUtils.HadoopConfigLoader.OPTION);
+ options.setString(
+ HadoopUtils.PATH_HADOOP_CONFIG,
hadoopPaimonConfDir.getAbsolutePath());
hadoopConf3 = HadoopUtils.getHadoopConfiguration(options);
+
+ options.set(HadoopUtils.HADOOP_CONF_LOADER,
HadoopUtils.HadoopConfigLoader.ALL);
+ hadoopConf4 = HadoopUtils.getHadoopConfiguration(options);
+
+ options.set("hadoop." + k1, v4);
+ hadoopConf5 = HadoopUtils.getHadoopConfiguration(options);
} finally {
CommonTestUtils.setEnv(originalEnv);
}
assertThat(hadoopConf1.get(k1, null)).isEqualTo(v1);
- assertThat(hadoopConf1.get(k2, null)).isEqualTo(v2);
- assertThat(hadoopConf1.get(k4, null)).isEqualTo(v4);
- assertThat(hadoopConf1.get(k5, null)).isEqualTo(v5);
assertThat(hadoopConf1.get(IN_CP_CONFIG_KEY,
null)).isEqualTo(IN_CP_CONFIG_VALUE);
- assertThat(hadoopConf2.get(k1, null)).isEqualTo("from
HADOOP_CONF_DIR");
- assertThat(hadoopConf2.get(k2, null)).isEqualTo("from
HADOOP_HOME/etc/hadoop");
- assertThat(hadoopConf2.get(k4, null)).isEqualTo("from
HADOOP_HOME/etc/hadoop");
- assertThat(hadoopConf2.get(k5, null)).isEqualTo("from
HADOOP_HOME/conf");
+ assertThat(hadoopConf2.get(k1, null)).isEqualTo(v2);
assertThat(hadoopConf2.get(IN_CP_CONFIG_KEY,
null)).isEqualTo(IN_CP_CONFIG_VALUE);
- assertThat(hadoopConf3.get(k1, null)).isEqualTo("from Paimon config
`hadoop-conf-dir`");
- assertThat(hadoopConf3.get(k2, null)).isEqualTo("from Paimon config
`hadoop-conf-dir`");
- assertThat(hadoopConf3.get(k4, null)).isNull();
- assertThat(hadoopConf3.get(k5, null)).isNull();
+ assertThat(hadoopConf3.get(k1, null)).isEqualTo(v3);
assertThat(hadoopConf3.get(IN_CP_CONFIG_KEY,
null)).isEqualTo(IN_CP_CONFIG_VALUE);
+
+ assertThat(hadoopConf4.get(k1, null)).isEqualTo(v3);
+ assertThat(hadoopConf4.get(IN_CP_CONFIG_KEY,
null)).isEqualTo(IN_CP_CONFIG_VALUE);
+
+ assertThat(hadoopConf5.get(k1, null)).isEqualTo(v4);
+ assertThat(hadoopConf5.get(IN_CP_CONFIG_KEY,
null)).isEqualTo(IN_CP_CONFIG_VALUE);
}
@Test
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 4e590d905..2fe2cbf26 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -96,6 +96,7 @@ import static
org.apache.paimon.options.CatalogOptions.SYNC_ALL_PROPERTIES;
import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
import static
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
+import static org.apache.paimon.utils.HadoopUtils.addHadoopConfIfFound;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
@@ -927,8 +928,7 @@ public class HiveCatalog extends AbstractCatalog {
// create HiveConf from hadoop configuration with hadoop conf
directory configured.
Configuration hadoopConf = defaultHadoopConf;
if (!isNullOrWhitespaceOnly(hadoopConfDir)) {
- hadoopConf = getHadoopConfiguration(hadoopConfDir);
- if (hadoopConf == null) {
+ if (!addHadoopConfIfFound(hadoopConf, hadoopConfDir, new
Options())) {
String possiableUsedConfFiles =
"core-site.xml | hdfs-site.xml | yarn-site.xml |
mapred-site.xml";
throw new RuntimeException(
@@ -1032,46 +1032,6 @@ public class HiveCatalog extends AbstractCatalog {
return hiveConf;
}
- /**
- * Returns a new Hadoop Configuration object using the path to the hadoop
conf configured.
- *
- * @param hadoopConfDir Hadoop conf directory path.
- * @return A Hadoop configuration instance.
- */
- public static Configuration getHadoopConfiguration(String hadoopConfDir) {
- if (new File(hadoopConfDir).exists()) {
- List<File> possiableConfFiles = new ArrayList<File>();
- File coreSite = new File(hadoopConfDir, "core-site.xml");
- if (coreSite.exists()) {
- possiableConfFiles.add(coreSite);
- }
- File hdfsSite = new File(hadoopConfDir, "hdfs-site.xml");
- if (hdfsSite.exists()) {
- possiableConfFiles.add(hdfsSite);
- }
- File yarnSite = new File(hadoopConfDir, "yarn-site.xml");
- if (yarnSite.exists()) {
- possiableConfFiles.add(yarnSite);
- }
- // Add mapred-site.xml. We need to read configurations like
compression codec.
- File mapredSite = new File(hadoopConfDir, "mapred-site.xml");
- if (mapredSite.exists()) {
- possiableConfFiles.add(mapredSite);
- }
- if (possiableConfFiles.isEmpty()) {
- return null;
- } else {
- Configuration hadoopConfiguration = new Configuration();
- for (File confFile : possiableConfFiles) {
- hadoopConfiguration.addResource(
- new
org.apache.hadoop.fs.Path(confFile.getAbsolutePath()));
- }
- return hadoopConfiguration;
- }
- }
- return null;
- }
-
public static String possibleHiveConfPath() {
return System.getenv("HIVE_CONF_DIR");
}