This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ed98b259d [flink] Optimize Hadoop configuration loading (#4241)
ed98b259d is described below

commit ed98b259da632df8407d99651a7de55e4e4931a3
Author: zhuanshenbsj1 <[email protected]>
AuthorDate: Tue Sep 24 17:55:52 2024 +0800

    [flink] Optimize Hadoop configuration loading (#4241)
---
 .../java/org/apache/paimon/utils/HadoopUtils.java  | 47 +++++++----
 .../apache/paimon/fs/HadoopConfigLoadingTest.java  | 96 ++++++++++------------
 .../java/org/apache/paimon/hive/HiveCatalog.java   | 44 +---------
 3 files changed, 77 insertions(+), 110 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/HadoopUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/HadoopUtils.java
index d931b63d5..be25fce4d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/HadoopUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/HadoopUtils.java
@@ -90,11 +90,16 @@ public class HadoopUtils {
         // a configuration
         // file with higher priority should be added later.
 
-        // Approach 1: HADOOP_HOME environment variables
-        String[] possibleHadoopConfPaths = new String[2];
-
         HadoopConfigLoader loader = options.get(HADOOP_CONF_LOADER);
 
+        // The HDFS configuration priority from low to high is as follows:
+        // 1. HADOOP_HOME
+        // 2. HADOOP_CONF_DIR
+        // 3. Paimon catalog or paimon table configuration(hadoop-conf-dir)
+        // 4. paimon table advanced configurations
+
+        // Approach 1: HADOOP_HOME environment variables
+        String[] possibleHadoopConfPaths = new String[2];
         final String hadoopHomeDir = System.getenv(HADOOP_HOME_ENV);
         if (hadoopHomeDir != null && loader.loadEnv()) {
             LOG.debug("Searching Hadoop configuration files in HADOOP_HOME: 
{}", hadoopHomeDir);
@@ -109,26 +114,25 @@ public class HadoopUtils {
             }
         }
 
-        // Approach 2: Paimon Catalog Option
-        final String hadoopConfigPath = options.getString(PATH_HADOOP_CONFIG, 
null);
-        if (!StringUtils.isNullOrWhitespaceOnly(hadoopConfigPath) && 
loader.loadOption()) {
-            LOG.debug(
-                    "Searching Hadoop configuration files in Paimon config: 
{}", hadoopConfigPath);
+        // Approach 2: HADOOP_CONF_DIR environment variable
+        String hadoopConfDir = System.getenv(HADOOP_CONF_ENV);
+        if (!StringUtils.isNullOrWhitespaceOnly(hadoopConfDir) && 
loader.loadEnv()) {
+            LOG.debug("Searching Hadoop configuration files in 
HADOOP_CONF_DIR: {}", hadoopConfDir);
             foundHadoopConfiguration =
-                    addHadoopConfIfFound(result, hadoopConfigPath, options)
+                    addHadoopConfIfFound(result, hadoopConfDir, options)
                             || foundHadoopConfiguration;
         }
 
-        // Approach 3: HADOOP_CONF_DIR environment variable
-        String hadoopConfDir = System.getenv(HADOOP_CONF_ENV);
-        if (!StringUtils.isNullOrWhitespaceOnly(hadoopConfDir) && 
loader.loadEnv()) {
-            LOG.debug("Searching Hadoop configuration files in 
HADOOP_CONF_DIR: {}", hadoopConfDir);
+        // Approach 3: Paimon table or paimon catalog hadoop conf
+        hadoopConfDir = options.getString(PATH_HADOOP_CONFIG, null);
+        if (!StringUtils.isNullOrWhitespaceOnly(hadoopConfDir) && 
loader.loadOption()) {
+            LOG.debug("Searching Hadoop configuration files in Paimon config: 
{}", hadoopConfDir);
             foundHadoopConfiguration =
                     addHadoopConfIfFound(result, hadoopConfDir, options)
                             || foundHadoopConfiguration;
         }
 
-        // Approach 4: Paimon configuration
+        // Approach 4: Paimon advanced configuration
         // add all configuration key with prefix 'hadoop.' in Paimon conf to 
hadoop conf
         for (String key : options.keySet()) {
             for (String prefix : CONFIG_PREFIXES) {
@@ -157,7 +161,7 @@ public class HadoopUtils {
      * Search Hadoop configuration files in the given path, and add them to 
the configuration if
      * found.
      */
-    private static boolean addHadoopConfIfFound(
+    public static boolean addHadoopConfIfFound(
             Configuration configuration, String possibleHadoopConfPath, 
Options options) {
         Path root = new Path(possibleHadoopConfPath);
 
@@ -185,6 +189,17 @@ public class HadoopUtils {
                                     + "/hdfs-site.xml to hadoop 
configuration");
                     foundHadoopConfiguration = true;
                 }
+
+                // Add mapred-site.xml. We need to read configurations like 
compression codec.
+                path = new Path(possibleHadoopConfPath, "mapred-site.xml");
+                if (fileIO.exists(path)) {
+                    readHadoopXml(fileIO.readFileUtf8(path), configuration);
+                    LOG.debug(
+                            "Adding "
+                                    + possibleHadoopConfPath
+                                    + "/mapred-site.xml to hadoop 
configuration");
+                    foundHadoopConfiguration = true;
+                }
             }
 
             return foundHadoopConfiguration;
@@ -222,7 +237,7 @@ public class HadoopUtils {
     public enum HadoopConfigLoader implements DescribedEnum {
         ALL("all", "Load Hadoop conf from environment variables and catalog 
option.", true, true),
         ENV("env", "Load Hadoop conf from environment variables only.", true, 
false),
-        OPTION("option", "Load Hadoop conf from catalog option only.", false, 
true);
+        OPTION("option", "Load Hadoop conf from catalog or table option 
only.", false, true);
 
         private final String value;
         private final String description;
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/fs/HadoopConfigLoadingTest.java 
b/paimon-common/src/test/java/org/apache/paimon/fs/HadoopConfigLoadingTest.java
index d4a20e260..9ba94b5b0 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/fs/HadoopConfigLoadingTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/fs/HadoopConfigLoadingTest.java
@@ -151,99 +151,91 @@ public class HadoopConfigLoadingTest {
 
     @Test
     public void loadOverlappingConfig() throws Exception {
-        final String k1 = "key1";
-        final String k2 = "key2";
-        final String k3 = "key3";
-        final String k4 = "key4";
-        final String k5 = "key5";
+        final String k1 = "key";
 
-        final String v1 = "from HADOOP_CONF_DIR";
-        final String v2 = "from Paimon config `hadoop-conf-dir`";
-        final String v4 = "from HADOOP_HOME/etc/hadoop";
-        final String v5 = "from HADOOP_HOME/conf";
+        final String v1 = "from HADOOP_HOME/etc/hadoop or HADOOP_HOME/conf";
+        final String v2 = "from HADOOP_CONF_DIR";
+        final String v3 = "from Paimon config hadoop-conf-dir";
+        final String v4 = "from Paimon advanced configuration";
 
-        final File hadoopConfDir = tempDir.resolve("hadoopConfDir").toFile();
-        final File hadoopConfEntryDir = 
tempDir.resolve("hadoopConfEntryDir").toFile();
-        final File legacyConfDir = tempDir.resolve("legacyConfDir").toFile();
         final File hadoopHome = tempDir.resolve("hadoopHome").toFile();
-
         final File hadoopHomeConf = 
tempDir.resolve("hadoopHome/conf").toFile();
         final File hadoopHomeEtc = 
tempDir.resolve("hadoopHome/etc/hadoop").toFile();
+        final File hadoopConfDir = tempDir.resolve("hadoopConfDir").toFile();
+        final File hadoopPaimonConfDir = 
tempDir.resolve("hadoopPaimonConfDir").toFile();
 
-        assertThat(hadoopConfDir.mkdirs()).isTrue();
-        assertThat(hadoopConfEntryDir.mkdirs()).isTrue();
-        assertThat(legacyConfDir.mkdirs()).isTrue();
         assertThat(hadoopHomeConf.mkdirs()).isTrue();
         assertThat(hadoopHomeEtc.mkdirs()).isTrue();
+        assertThat(hadoopConfDir.mkdirs()).isTrue();
+        assertThat(hadoopPaimonConfDir.mkdirs()).isTrue();
 
-        final File file1 = new File(hadoopConfDir, "core-site.xml");
-        final File file2 = new File(hadoopConfEntryDir, "core-site.xml");
-        final File file4 = new File(hadoopHomeEtc, "core-site.xml");
-        final File file5 = new File(hadoopHomeConf, "core-site.xml");
+        final File fileHomeEtc = new File(hadoopHomeEtc, "core-site.xml");
+        final File fileHomeConf = new File(hadoopHomeConf, "core-site.xml");
+        final File fileConfDir = new File(hadoopConfDir, "core-site.xml");
+        final File filePaimonConfDir = new File(hadoopPaimonConfDir, 
"core-site.xml");
 
-        printConfig(file1, k1, v1);
+        Map<String, String> properties1 = new HashMap<>();
+        properties1.put(k1, v1);
+        printConfigs(fileHomeEtc, properties1);
+        printConfigs(fileHomeConf, properties1);
 
         Map<String, String> properties2 = new HashMap<>();
         properties2.put(k1, v2);
-        properties2.put(k2, v2);
-        printConfigs(file2, properties2);
-
-        Map<String, String> properties4 = new HashMap<>();
-        properties4.put(k1, v4);
-        properties4.put(k2, v4);
-        properties4.put(k3, v4);
-        properties4.put(k4, v4);
-        printConfigs(file4, properties4);
-
-        Map<String, String> properties5 = new HashMap<>();
-        properties5.put(k1, v5);
-        properties5.put(k2, v5);
-        properties5.put(k3, v5);
-        properties5.put(k4, v5);
-        properties5.put(k5, v5);
-        printConfigs(file5, properties5);
+        printConfigs(fileConfDir, properties2);
+
+        Map<String, String> properties3 = new HashMap<>();
+        properties3.put(k1, v3);
+        printConfigs(filePaimonConfDir, properties3);
 
         final Options options = new Options();
-        options.setString(HadoopUtils.PATH_HADOOP_CONFIG, 
hadoopConfEntryDir.getAbsolutePath());
 
         final Configuration hadoopConf1;
         final Configuration hadoopConf2;
         final Configuration hadoopConf3;
+        final Configuration hadoopConf4;
+        final Configuration hadoopConf5;
 
         final Map<String, String> originalEnv = System.getenv();
         final Map<String, String> newEnv = new HashMap<>(originalEnv);
-        newEnv.put(HadoopUtils.HADOOP_CONF_ENV, 
hadoopConfDir.getAbsolutePath());
         newEnv.put(HadoopUtils.HADOOP_HOME_ENV, hadoopHome.getAbsolutePath());
+        final Map<String, String> newEnv1 = new HashMap<>(newEnv);
+        newEnv1.put(HadoopUtils.HADOOP_CONF_ENV, 
hadoopConfDir.getAbsolutePath());
         try {
+            options.set(HadoopUtils.HADOOP_CONF_LOADER, 
HadoopUtils.HadoopConfigLoader.ENV);
             CommonTestUtils.setEnv(newEnv);
             hadoopConf1 = HadoopUtils.getHadoopConfiguration(options);
 
-            options.set(HadoopUtils.HADOOP_CONF_LOADER, 
HadoopUtils.HadoopConfigLoader.ENV);
+            CommonTestUtils.setEnv(newEnv1);
             hadoopConf2 = HadoopUtils.getHadoopConfiguration(options);
 
             options.set(HadoopUtils.HADOOP_CONF_LOADER, 
HadoopUtils.HadoopConfigLoader.OPTION);
+            options.setString(
+                    HadoopUtils.PATH_HADOOP_CONFIG, 
hadoopPaimonConfDir.getAbsolutePath());
             hadoopConf3 = HadoopUtils.getHadoopConfiguration(options);
+
+            options.set(HadoopUtils.HADOOP_CONF_LOADER, 
HadoopUtils.HadoopConfigLoader.ALL);
+            hadoopConf4 = HadoopUtils.getHadoopConfiguration(options);
+
+            options.set("hadoop." + k1, v4);
+            hadoopConf5 = HadoopUtils.getHadoopConfiguration(options);
         } finally {
             CommonTestUtils.setEnv(originalEnv);
         }
 
         assertThat(hadoopConf1.get(k1, null)).isEqualTo(v1);
-        assertThat(hadoopConf1.get(k2, null)).isEqualTo(v2);
-        assertThat(hadoopConf1.get(k4, null)).isEqualTo(v4);
-        assertThat(hadoopConf1.get(k5, null)).isEqualTo(v5);
         assertThat(hadoopConf1.get(IN_CP_CONFIG_KEY, 
null)).isEqualTo(IN_CP_CONFIG_VALUE);
 
-        assertThat(hadoopConf2.get(k1, null)).isEqualTo("from 
HADOOP_CONF_DIR");
-        assertThat(hadoopConf2.get(k2, null)).isEqualTo("from 
HADOOP_HOME/etc/hadoop");
-        assertThat(hadoopConf2.get(k4, null)).isEqualTo("from 
HADOOP_HOME/etc/hadoop");
-        assertThat(hadoopConf2.get(k5, null)).isEqualTo("from 
HADOOP_HOME/conf");
+        assertThat(hadoopConf2.get(k1, null)).isEqualTo(v2);
         assertThat(hadoopConf2.get(IN_CP_CONFIG_KEY, 
null)).isEqualTo(IN_CP_CONFIG_VALUE);
 
-        assertThat(hadoopConf3.get(k1, null)).isEqualTo("from Paimon config 
`hadoop-conf-dir`");
-        assertThat(hadoopConf3.get(k2, null)).isEqualTo("from Paimon config 
`hadoop-conf-dir`");
-        assertThat(hadoopConf3.get(k4, null)).isNull();
-        assertThat(hadoopConf3.get(k5, null)).isNull();
+        assertThat(hadoopConf3.get(k1, null)).isEqualTo(v3);
         assertThat(hadoopConf3.get(IN_CP_CONFIG_KEY, 
null)).isEqualTo(IN_CP_CONFIG_VALUE);
+
+        assertThat(hadoopConf4.get(k1, null)).isEqualTo(v3);
+        assertThat(hadoopConf4.get(IN_CP_CONFIG_KEY, 
null)).isEqualTo(IN_CP_CONFIG_VALUE);
+
+        assertThat(hadoopConf5.get(k1, null)).isEqualTo(v4);
+        assertThat(hadoopConf5.get(IN_CP_CONFIG_KEY, 
null)).isEqualTo(IN_CP_CONFIG_VALUE);
     }
 
     @Test
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 4e590d905..2fe2cbf26 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -96,6 +96,7 @@ import static 
org.apache.paimon.options.CatalogOptions.SYNC_ALL_PROPERTIES;
 import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
 import static 
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
 import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
+import static org.apache.paimon.utils.HadoopUtils.addHadoopConfIfFound;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
 
@@ -927,8 +928,7 @@ public class HiveCatalog extends AbstractCatalog {
         // create HiveConf from hadoop configuration with hadoop conf 
directory configured.
         Configuration hadoopConf = defaultHadoopConf;
         if (!isNullOrWhitespaceOnly(hadoopConfDir)) {
-            hadoopConf = getHadoopConfiguration(hadoopConfDir);
-            if (hadoopConf == null) {
+            if (!addHadoopConfIfFound(hadoopConf, hadoopConfDir, new 
Options())) {
                 String possiableUsedConfFiles =
                         "core-site.xml | hdfs-site.xml | yarn-site.xml | 
mapred-site.xml";
                 throw new RuntimeException(
@@ -1032,46 +1032,6 @@ public class HiveCatalog extends AbstractCatalog {
         return hiveConf;
     }
 
-    /**
-     * Returns a new Hadoop Configuration object using the path to the hadoop 
conf configured.
-     *
-     * @param hadoopConfDir Hadoop conf directory path.
-     * @return A Hadoop configuration instance.
-     */
-    public static Configuration getHadoopConfiguration(String hadoopConfDir) {
-        if (new File(hadoopConfDir).exists()) {
-            List<File> possiableConfFiles = new ArrayList<File>();
-            File coreSite = new File(hadoopConfDir, "core-site.xml");
-            if (coreSite.exists()) {
-                possiableConfFiles.add(coreSite);
-            }
-            File hdfsSite = new File(hadoopConfDir, "hdfs-site.xml");
-            if (hdfsSite.exists()) {
-                possiableConfFiles.add(hdfsSite);
-            }
-            File yarnSite = new File(hadoopConfDir, "yarn-site.xml");
-            if (yarnSite.exists()) {
-                possiableConfFiles.add(yarnSite);
-            }
-            // Add mapred-site.xml. We need to read configurations like 
compression codec.
-            File mapredSite = new File(hadoopConfDir, "mapred-site.xml");
-            if (mapredSite.exists()) {
-                possiableConfFiles.add(mapredSite);
-            }
-            if (possiableConfFiles.isEmpty()) {
-                return null;
-            } else {
-                Configuration hadoopConfiguration = new Configuration();
-                for (File confFile : possiableConfFiles) {
-                    hadoopConfiguration.addResource(
-                            new 
org.apache.hadoop.fs.Path(confFile.getAbsolutePath()));
-                }
-                return hadoopConfiguration;
-            }
-        }
-        return null;
-    }
-
     public static String possibleHiveConfPath() {
         return System.getenv("HIVE_CONF_DIR");
     }

Reply via email to