This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ffb355578 [hive] Remove PaimonJobConf and clean up 
LocationKeyExtractor (#1558)
ffb355578 is described below

commit ffb3555780865cd6f26764e88a7db5a1dc3b0cce
Author: tsreaper <[email protected]>
AuthorDate: Thu Jul 13 14:10:49 2023 +0800

    [hive] Remove PaimonJobConf and clean up LocationKeyExtractor (#1558)
    
    This closes #1558.
---
 paimon-hive/paimon-hive-common/pom.xml             |  17 ++
 .../apache/paimon/hive/LocationKeyExtractor.java   | 178 ++++++++++++++++++---
 .../java/org/apache/paimon/hive/HiveSchema.java    |   5 +-
 .../java/org/apache/paimon/hive/PaimonJobConf.java |  89 -----------
 .../org/apache/paimon/hive/PaimonMetaHook.java     |   7 +-
 .../apache/paimon/hive/PaimonStorageHandler.java   |  17 +-
 .../paimon/hive/mapred/PaimonInputFormat.java      |  17 +-
 .../org/apache/paimon/hive/utils/HiveUtils.java    |  29 +++-
 .../org/apache/paimon/hive/CreateTableITCase.java  |   2 +-
 9 files changed, 225 insertions(+), 136 deletions(-)

diff --git a/paimon-hive/paimon-hive-common/pom.xml 
b/paimon-hive/paimon-hive-common/pom.xml
index 1980d3a68..7216e997a 100644
--- a/paimon-hive/paimon-hive-common/pom.xml
+++ b/paimon-hive/paimon-hive-common/pom.xml
@@ -41,6 +41,23 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.hive</groupId>
             <artifactId>hive-serde</artifactId>
diff --git 
a/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/LocationKeyExtractor.java
 
b/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/LocationKeyExtractor.java
index 68f91009f..9157ebd94 100644
--- 
a/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/LocationKeyExtractor.java
+++ 
b/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/LocationKeyExtractor.java
@@ -19,10 +19,19 @@
 package org.apache.paimon.hive;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -33,45 +42,166 @@ import static 
org.apache.hadoop.hive.metastore.Warehouse.getDnsPath;
  * where the Paimon table is stored.
  */
 public class LocationKeyExtractor {
+
     // special at the tbproperties with the name paimon_location.
     public static final String TBPROPERTIES_LOCATION_KEY = "paimon_location";
 
-    public static String getLocation(Properties properties) {
-        String storageLocation =
-                
properties.getProperty(hive_metastoreConstants.META_TABLE_LOCATION);
-        String propertiesLocation = 
properties.getProperty(TBPROPERTIES_LOCATION_KEY);
-        return propertiesLocation != null ? propertiesLocation : 
storageLocation;
-    }
+    public static final String INTERNAL_LOCATION = "paimon.internal.location";
 
-    public static String getLocation(Table table) {
-        String sdLocation = table.getSd().getLocation();
+    /** Get the real path of Paimon table. */
+    public static String getPaimonLocation(@Nullable Configuration conf, 
Properties properties) {
+        // read from table properties
+        // if users set HiveCatalogOptions#LOCATION_IN_PROPERTIES
+        String location = properties.getProperty(TBPROPERTIES_LOCATION_KEY);
+        if (location != null) {
+            return location;
+        }
 
-        Map<String, String> params = table.getParameters();
+        // read what metastore tells us
+        location = 
properties.getProperty(hive_metastoreConstants.META_TABLE_LOCATION);
+        if (location != null) {
+            if (conf != null) {
+                try {
+                    return getDnsPath(new Path(location), conf).toString();
+                } catch (MetaException e) {
+                    throw new RuntimeException(e);
+                }
+            } else {
+                return location;
+            }
+        }
 
-        String propertiesLocation = null;
+        // for some Hive compatible systems
+        if (conf != null) {
+            return conf.get("table.original.path");
+        }
+
+        return null;
+    }
+
+    /** Get the real path of Paimon table. */
+    public static String getPaimonLocation(Configuration conf, Table table) 
throws MetaException {
+        // read from table properties
+        // if users set HiveCatalogOptions#LOCATION_IN_PROPERTIES
+        Map<String, String> params = table.getParameters();
         if (params != null) {
-            propertiesLocation = params.get(TBPROPERTIES_LOCATION_KEY);
+            String location = params.get(TBPROPERTIES_LOCATION_KEY);
+            if (location != null) {
+                return location;
+            }
         }
 
-        return propertiesLocation != null ? propertiesLocation : sdLocation;
+        // read what metastore tells us
+        String location = table.getSd().getLocation();
+        if (location != null) {
+            location = getDnsPath(new Path(location), conf).toString();
+            table.getSd().setLocation(location);
+        }
+        return location;
     }
 
-    public static String getLocation(Table table, Configuration conf) throws 
MetaException {
-        String sdLocation = table.getSd().getLocation();
-        if (sdLocation != null) {
-            org.apache.hadoop.fs.Path path;
-            path = getDnsPath(new org.apache.hadoop.fs.Path(sdLocation), conf);
-            sdLocation = path.toUri().toString();
-            table.getSd().setLocation(sdLocation);
+    /** Get the real path of Paimon table. */
+    public static String getPaimonLocation(JobConf conf) {
+        // read what PaimonStorageHandler tells us
+        String location = conf.get(INTERNAL_LOCATION);
+        if (location != null) {
+            return location;
         }
 
-        Map<String, String> params = table.getParameters();
+        // read from table properties
+        // if users set HiveCatalogOptions#LOCATION_IN_PROPERTIES
+        location = conf.get(TBPROPERTIES_LOCATION_KEY);
+        if (location != null) {
+            return location;
+        }
 
-        String propertiesLocation = null;
-        if (params != null) {
-            propertiesLocation = params.get(TBPROPERTIES_LOCATION_KEY);
+        // for some Hive compatible systems
+        location = conf.get("table.original.path");
+        if (location != null) {
+            return location;
+        }
+
+        // read the input dir of this Hive job
+        //
+        // it is possible that input dir is the directory of a partition,
+        // so we should find the root of table by checking if,
+        // in each parent directory, the schema directory exists
+        location = conf.get(FileInputFormat.INPUT_DIR);
+        if (location != null) {
+            Path path = new Path(location);
+            try {
+                FileSystem fs = path.getFileSystem(conf);
+                while (path != null) {
+                    if (fs.exists(new Path(path, "schema"))) {
+                        break;
+                    }
+                    path = path.getParent();
+                }
+            } catch (IOException e) {
+                throw new UncheckedIOException(e);
+            }
+            if (path != null) {
+                try {
+                    return getDnsPath(path, conf).toString();
+                } catch (MetaException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+
+        return null;
+    }
+
+    /**
+     * Get the path stated in metastore. It is not necessary the real path of 
Paimon table.
+     *
+     * <p>If the path of the Paimon table is moved from the location of the 
Hive table to properties
+     * (see HiveCatalogOptions#LOCATION_IN_PROPERTIES), Hive will add a 
location for this table
+     * based on the warehouse, database, and table automatically. When 
querying by Hive, an
+     * exception may occur because the specified path for split for Paimon may 
not match the
+     * location of Hive. To work around this problem, we specify the path for 
split as the location
+     * of Hive.
+     */
+    public static String getMetastoreLocation(Configuration conf, List<String> 
partitionKeys) {
+        // read what metastore tells us
+        String location = 
conf.get(hive_metastoreConstants.META_TABLE_LOCATION);
+        if (location != null) {
+            try {
+                return getDnsPath(new Path(location), conf).toString();
+            } catch (MetaException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        // for some Hive compatible systems
+        location = conf.get("table.original.path");
+        if (location != null) {
+            return location;
+        }
+
+        // read the input dir of this Hive job
+        //
+        // it is possible that input dir is the directory of a partition,
+        // so we should find the root of table by removing the partition 
directory
+        location = conf.get(FileInputFormat.INPUT_DIR);
+        if (location != null) {
+            Path path = new Path(location);
+            int numPartitionKeys = partitionKeys.size();
+            if (numPartitionKeys > 0
+                    && 
path.getName().startsWith(partitionKeys.get(numPartitionKeys - 1) + "=")) {
+                for (int i = 0; i < numPartitionKeys; i++) {
+                    path = path.getParent();
+                }
+            }
+            if (path != null) {
+                try {
+                    return getDnsPath(path, conf).toString();
+                } catch (MetaException e) {
+                    throw new RuntimeException(e);
+                }
+            }
         }
 
-        return propertiesLocation != null ? propertiesLocation : sdLocation;
+        return null;
     }
 }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
index f1e4c6b8d..db49c32eb 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.utils.HiveUtils;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
@@ -92,7 +93,7 @@ public class HiveSchema {
 
     /** Extract {@link HiveSchema} from Hive serde properties. */
     public static HiveSchema extract(@Nullable Configuration configuration, 
Properties properties) {
-        String location = LocationKeyExtractor.getLocation(properties);
+        String location = 
LocationKeyExtractor.getPaimonLocation(configuration, properties);
         Optional<TableSchema> tableSchema = getExistingSchema(configuration, 
location);
         String columnProperty = 
properties.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS);
 
@@ -181,7 +182,7 @@ public class HiveSchema {
             return Optional.empty();
         }
         Path path = new Path(location);
-        Options options = PaimonJobConf.extractCatalogConfig(configuration);
+        Options options = HiveUtils.extractCatalogConfig(configuration);
         options.set(CoreOptions.PATH, location);
         CatalogContext context = CatalogContext.create(options, configuration);
         try {
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonJobConf.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonJobConf.java
deleted file mode 100644
index 054ce565b..000000000
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonJobConf.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.hive;
-
-import org.apache.paimon.hive.mapred.PaimonOutputCommitter;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.utils.JsonSerdeUtil;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobConf;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Utility class to convert Hive table property keys and get file store 
specific configurations from
- * {@link JobConf}.
- */
-public class PaimonJobConf {
-
-    private static final String INTERNAL_LOCATION = "paimon.internal.location";
-    private static final String INTERNAL_CATALOG_CONFIG = 
"paimon.catalog.config";
-
-    public static final String MAPRED_OUTPUT_COMMITTER = 
"mapred.output.committer.class";
-
-    public static final String PAIMON_WRITE = "paimon.write";
-
-    private static final String PAIMON_PREFIX = "paimon.";
-
-    private final JobConf jobConf;
-
-    public PaimonJobConf(JobConf jobConf) {
-        this.jobConf = jobConf;
-    }
-
-    public static void configureInputJobProperties(
-            Configuration configuration, Properties properties, Map<String, 
String> map) {
-        map.put(
-                INTERNAL_CATALOG_CONFIG,
-                
JsonSerdeUtil.toJson(extractCatalogConfig(configuration).toMap()));
-        map.put(INTERNAL_LOCATION, 
LocationKeyExtractor.getLocation(properties));
-    }
-
-    public static void configureOutputJobProperties(
-            Configuration configuration, Properties properties, Map<String, 
String> map) {
-        map.put(INTERNAL_LOCATION, 
LocationKeyExtractor.getLocation(properties));
-        map.put(MAPRED_OUTPUT_COMMITTER, 
PaimonOutputCommitter.class.getName());
-        map.put(PAIMON_WRITE, Boolean.TRUE.toString());
-        properties.put(PAIMON_WRITE, Boolean.TRUE.toString());
-    }
-
-    public String getLocation() {
-        return jobConf.get(INTERNAL_LOCATION);
-    }
-
-    /** Extract paimon catalog conf from Hive conf. */
-    public static Options extractCatalogConfig(Configuration hiveConf) {
-        Map<String, String> configMap = new HashMap<>();
-
-        if (hiveConf != null) {
-            for (Map.Entry<String, String> entry : hiveConf) {
-                String name = entry.getKey();
-                if (name.startsWith(PAIMON_PREFIX)) {
-                    String value = hiveConf.get(name);
-                    name = name.substring(PAIMON_PREFIX.length());
-                    configMap.put(name, value);
-                }
-            }
-        }
-        return Options.fromMap(configMap);
-    }
-}
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java
index 42cccd6a2..fe2ebfad1 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java
@@ -26,6 +26,7 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.hive.mapred.PaimonInputFormat;
 import org.apache.paimon.hive.mapred.PaimonOutputFormat;
+import org.apache.paimon.hive.utils.HiveUtils;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
@@ -77,7 +78,7 @@ public class PaimonMetaHook implements HiveMetaHook {
 
         
table.getSd().setInputFormat(PaimonInputFormat.class.getCanonicalName());
         
table.getSd().setOutputFormat(PaimonOutputFormat.class.getCanonicalName());
-        String location = LocationKeyExtractor.getLocation(table, conf);
+        String location = LocationKeyExtractor.getPaimonLocation(conf, table);
         if (location == null) {
             String warehouse = 
conf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname);
             org.apache.hadoop.fs.Path hadoopPath =
@@ -130,7 +131,7 @@ public class PaimonMetaHook implements HiveMetaHook {
         }
 
         // we have created a paimon table, so we delete it to roll back;
-        String location = LocationKeyExtractor.getLocation(table);
+        String location = LocationKeyExtractor.getPaimonLocation(conf, table);
 
         Path path = new Path(location);
         CatalogContext context = catalogContext(table, location);
@@ -157,7 +158,7 @@ public class PaimonMetaHook implements HiveMetaHook {
     public void commitDropTable(Table table, boolean b) throws MetaException {}
 
     private CatalogContext catalogContext(Table table, String location) {
-        Options options = PaimonJobConf.extractCatalogConfig(conf);
+        Options options = HiveUtils.extractCatalogConfig(conf);
         options.set(CoreOptions.PATH, location);
         table.getParameters().forEach(options::set);
         return CatalogContext.create(options, conf);
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonStorageHandler.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonStorageHandler.java
index 0a6396aeb..44b2828d8 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonStorageHandler.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonStorageHandler.java
@@ -40,12 +40,12 @@ import org.apache.hadoop.mapred.OutputFormat;
 import java.util.Map;
 import java.util.Properties;
 
-import static org.apache.paimon.hive.PaimonJobConf.MAPRED_OUTPUT_COMMITTER;
-import static org.apache.paimon.hive.PaimonJobConf.PAIMON_WRITE;
-
 /** {@link HiveStorageHandler} for paimon. This is the entrance class of Hive 
API. */
 public class PaimonStorageHandler implements HiveStoragePredicateHandler, 
HiveStorageHandler {
 
+    private static final String MAPRED_OUTPUT_COMMITTER = 
"mapred.output.committer.class";
+    private static final String PAIMON_WRITE = "paimon.write";
+
     private Configuration conf;
 
     @Override
@@ -76,7 +76,9 @@ public class PaimonStorageHandler implements 
HiveStoragePredicateHandler, HiveSt
     @Override
     public void configureInputJobProperties(TableDesc tableDesc, Map<String, 
String> map) {
         Properties properties = tableDesc.getProperties();
-        PaimonJobConf.configureInputJobProperties(conf, properties, map);
+        map.put(
+                LocationKeyExtractor.INTERNAL_LOCATION,
+                LocationKeyExtractor.getPaimonLocation(conf, properties));
     }
 
     public void configureInputJobCredentials(TableDesc tableDesc, Map<String, 
String> map) {}
@@ -84,7 +86,12 @@ public class PaimonStorageHandler implements 
HiveStoragePredicateHandler, HiveSt
     @Override
     public void configureOutputJobProperties(TableDesc tableDesc, Map<String, 
String> map) {
         Properties properties = tableDesc.getProperties();
-        PaimonJobConf.configureOutputJobProperties(conf, properties, map);
+        map.put(
+                LocationKeyExtractor.INTERNAL_LOCATION,
+                LocationKeyExtractor.getPaimonLocation(conf, properties));
+        map.put(MAPRED_OUTPUT_COMMITTER, 
PaimonOutputCommitter.class.getName());
+        map.put(PAIMON_WRITE, Boolean.TRUE.toString());
+        properties.put(PAIMON_WRITE, Boolean.TRUE.toString());
     }
 
     @Override
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
index ef8b5b1b3..4de2bfc05 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.hive.mapred;
 
+import org.apache.paimon.hive.LocationKeyExtractor;
 import org.apache.paimon.hive.RowDataContainer;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
@@ -27,7 +28,6 @@ import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.types.RowType;
 
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
@@ -56,16 +56,17 @@ public class PaimonInputFormat implements InputFormat<Void, 
RowDataContainer> {
 
     @Override
     public InputSplit[] getSplits(JobConf jobConf, int numSplits) {
-        // If the path of the Paimon table is moved from the location of the 
Hive table to
-        // properties, Hive will add a location for this table based on the 
warehouse,
-        // database, and table automatically.When querying by Hive, an 
exception may occur
-        // because the specified path for split for Paimon may not match the 
location of Hive.
-        // To work around this problem, we specify the path for split as the 
location of Hive.
-        String location = 
jobConf.get(hive_metastoreConstants.META_TABLE_LOCATION);
-
         FileStoreTable table = createFileStoreTable(jobConf);
         InnerTableScan scan = table.newScan();
 
+        // If the path of the Paimon table is moved from the location of the 
Hive table to
+        // properties (see HiveCatalogOptions#LOCATION_IN_PROPERTIES), Hive 
will add a location for
+        // this table based on the warehouse, database, and table 
automatically. When querying by
+        // Hive, an exception may occur because the specified path for split 
for Paimon may not
+        // match the location of Hive. To work around this problem, we specify 
the path for split as
+        // the location of Hive.
+        String location = LocationKeyExtractor.getMetastoreLocation(jobConf, 
table.partitionKeys());
+
         List<Predicate> predicates = new ArrayList<>();
         String inputDir = jobConf.get(FileInputFormat.INPUT_DIR);
         createPartitionPredicate(table.schema().logicalRowType(), location, 
inputDir)
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveUtils.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveUtils.java
index de004d6c5..ec0b78deb 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveUtils.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveUtils.java
@@ -20,7 +20,7 @@ package org.apache.paimon.hive.utils;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.hive.PaimonJobConf;
+import org.apache.paimon.hive.LocationKeyExtractor;
 import org.apache.paimon.hive.SearchArgumentToPredicateConverter;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
@@ -28,22 +28,26 @@ import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.mapred.JobConf;
 
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
 /** Utils for create {@link FileStoreTable} and {@link Predicate}. */
 public class HiveUtils {
 
+    private static final String PAIMON_PREFIX = "paimon.";
+
     public static FileStoreTable createFileStoreTable(JobConf jobConf) {
-        PaimonJobConf wrapper = new PaimonJobConf(jobConf);
-        Options options = PaimonJobConf.extractCatalogConfig(jobConf);
-        options.set(CoreOptions.PATH, wrapper.getLocation());
+        Options options = extractCatalogConfig(jobConf);
+        options.set(CoreOptions.PATH, 
LocationKeyExtractor.getPaimonLocation(jobConf));
         CatalogContext catalogContext = CatalogContext.create(options, 
jobConf);
         return FileStoreTableFactory.create(catalogContext);
     }
@@ -65,4 +69,21 @@ public class HiveUtils {
                                 : null);
         return converter.convert();
     }
+
+    /** Extract paimon catalog conf from Hive conf. */
+    public static Options extractCatalogConfig(Configuration hiveConf) {
+        Map<String, String> configMap = new HashMap<>();
+
+        if (hiveConf != null) {
+            for (Map.Entry<String, String> entry : hiveConf) {
+                String name = entry.getKey();
+                if (name.startsWith(PAIMON_PREFIX)) {
+                    String value = hiveConf.get(name);
+                    name = name.substring(PAIMON_PREFIX.length());
+                    configMap.put(name, value);
+                }
+            }
+        }
+        return Options.fromMap(configMap);
+    }
 }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/CreateTableITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/CreateTableITCase.java
index 6712dc0aa..af74d8667 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/CreateTableITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/CreateTableITCase.java
@@ -61,7 +61,7 @@ public class CreateTableITCase extends HiveTestBase {
         assertThatThrownBy(() -> hiveShell.execute(hiveSql))
                 .hasRootCauseInstanceOf(IllegalArgumentException.class)
                 .hasRootCauseMessage(
-                        "Schema file not found in location "
+                        "Schema file not found in location file:"
                                 + path
                                 + ". Please create table first.");
     }

Reply via email to