This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ffb355578 [hive] Remove PaimonJobConf and clean up
LocationKeyExtractor (#1558)
ffb355578 is described below
commit ffb3555780865cd6f26764e88a7db5a1dc3b0cce
Author: tsreaper <[email protected]>
AuthorDate: Thu Jul 13 14:10:49 2023 +0800
[hive] Remove PaimonJobConf and clean up LocationKeyExtractor (#1558)
This closes #1558.
---
paimon-hive/paimon-hive-common/pom.xml | 17 ++
.../apache/paimon/hive/LocationKeyExtractor.java | 178 ++++++++++++++++++---
.../java/org/apache/paimon/hive/HiveSchema.java | 5 +-
.../java/org/apache/paimon/hive/PaimonJobConf.java | 89 -----------
.../org/apache/paimon/hive/PaimonMetaHook.java | 7 +-
.../apache/paimon/hive/PaimonStorageHandler.java | 17 +-
.../paimon/hive/mapred/PaimonInputFormat.java | 17 +-
.../org/apache/paimon/hive/utils/HiveUtils.java | 29 +++-
.../org/apache/paimon/hive/CreateTableITCase.java | 2 +-
9 files changed, 225 insertions(+), 136 deletions(-)
diff --git a/paimon-hive/paimon-hive-common/pom.xml
b/paimon-hive/paimon-hive-common/pom.xml
index 1980d3a68..7216e997a 100644
--- a/paimon-hive/paimon-hive-common/pom.xml
+++ b/paimon-hive/paimon-hive-common/pom.xml
@@ -41,6 +41,23 @@ under the License.
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-serde</artifactId>
diff --git
a/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/LocationKeyExtractor.java
b/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/LocationKeyExtractor.java
index 68f91009f..9157ebd94 100644
---
a/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/LocationKeyExtractor.java
+++
b/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/LocationKeyExtractor.java
@@ -19,10 +19,19 @@
package org.apache.paimon.hive;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -33,45 +42,166 @@ import static
org.apache.hadoop.hive.metastore.Warehouse.getDnsPath;
* where the Paimon table is stored.
*/
public class LocationKeyExtractor {
+
// special at the tbproperties with the name paimon_location.
public static final String TBPROPERTIES_LOCATION_KEY = "paimon_location";
- public static String getLocation(Properties properties) {
- String storageLocation =
-
properties.getProperty(hive_metastoreConstants.META_TABLE_LOCATION);
- String propertiesLocation =
properties.getProperty(TBPROPERTIES_LOCATION_KEY);
- return propertiesLocation != null ? propertiesLocation :
storageLocation;
- }
+ public static final String INTERNAL_LOCATION = "paimon.internal.location";
- public static String getLocation(Table table) {
- String sdLocation = table.getSd().getLocation();
+ /** Get the real path of Paimon table. */
+ public static String getPaimonLocation(@Nullable Configuration conf,
Properties properties) {
+ // read from table properties
+ // if users set HiveCatalogOptions#LOCATION_IN_PROPERTIES
+ String location = properties.getProperty(TBPROPERTIES_LOCATION_KEY);
+ if (location != null) {
+ return location;
+ }
- Map<String, String> params = table.getParameters();
+ // read what metastore tells us
+ location =
properties.getProperty(hive_metastoreConstants.META_TABLE_LOCATION);
+ if (location != null) {
+ if (conf != null) {
+ try {
+ return getDnsPath(new Path(location), conf).toString();
+ } catch (MetaException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ return location;
+ }
+ }
- String propertiesLocation = null;
+ // for some Hive compatible systems
+ if (conf != null) {
+ return conf.get("table.original.path");
+ }
+
+ return null;
+ }
+
+ /** Get the real path of Paimon table. */
+ public static String getPaimonLocation(Configuration conf, Table table)
throws MetaException {
+ // read from table properties
+ // if users set HiveCatalogOptions#LOCATION_IN_PROPERTIES
+ Map<String, String> params = table.getParameters();
if (params != null) {
- propertiesLocation = params.get(TBPROPERTIES_LOCATION_KEY);
+ String location = params.get(TBPROPERTIES_LOCATION_KEY);
+ if (location != null) {
+ return location;
+ }
}
- return propertiesLocation != null ? propertiesLocation : sdLocation;
+ // read what metastore tells us
+ String location = table.getSd().getLocation();
+ if (location != null) {
+ location = getDnsPath(new Path(location), conf).toString();
+ table.getSd().setLocation(location);
+ }
+ return location;
}
- public static String getLocation(Table table, Configuration conf) throws
MetaException {
- String sdLocation = table.getSd().getLocation();
- if (sdLocation != null) {
- org.apache.hadoop.fs.Path path;
- path = getDnsPath(new org.apache.hadoop.fs.Path(sdLocation), conf);
- sdLocation = path.toUri().toString();
- table.getSd().setLocation(sdLocation);
+ /** Get the real path of Paimon table. */
+ public static String getPaimonLocation(JobConf conf) {
+ // read what PaimonStorageHandler tells us
+ String location = conf.get(INTERNAL_LOCATION);
+ if (location != null) {
+ return location;
}
- Map<String, String> params = table.getParameters();
+ // read from table properties
+ // if users set HiveCatalogOptions#LOCATION_IN_PROPERTIES
+ location = conf.get(TBPROPERTIES_LOCATION_KEY);
+ if (location != null) {
+ return location;
+ }
- String propertiesLocation = null;
- if (params != null) {
- propertiesLocation = params.get(TBPROPERTIES_LOCATION_KEY);
+ // for some Hive compatible systems
+ location = conf.get("table.original.path");
+ if (location != null) {
+ return location;
+ }
+
+ // read the input dir of this Hive job
+ //
+ // it is possible that input dir is the directory of a partition,
+ // so we should find the root of table by checking if,
+ // in each parent directory, the schema directory exists
+ location = conf.get(FileInputFormat.INPUT_DIR);
+ if (location != null) {
+ Path path = new Path(location);
+ try {
+ FileSystem fs = path.getFileSystem(conf);
+ while (path != null) {
+ if (fs.exists(new Path(path, "schema"))) {
+ break;
+ }
+ path = path.getParent();
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ if (path != null) {
+ try {
+ return getDnsPath(path, conf).toString();
+ } catch (MetaException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Get the path stated in metastore. It is not necessary the real path of
Paimon table.
+ *
+ * <p>If the path of the Paimon table is moved from the location of the
Hive table to properties
+ * (see HiveCatalogOptions#LOCATION_IN_PROPERTIES), Hive will add a
location for this table
+ * based on the warehouse, database, and table automatically. When
querying by Hive, an
+ * exception may occur because the specified path for split for Paimon may
not match the
+ * location of Hive. To work around this problem, we specify the path for
split as the location
+ * of Hive.
+ */
+ public static String getMetastoreLocation(Configuration conf, List<String>
partitionKeys) {
+ // read what metastore tells us
+ String location =
conf.get(hive_metastoreConstants.META_TABLE_LOCATION);
+ if (location != null) {
+ try {
+ return getDnsPath(new Path(location), conf).toString();
+ } catch (MetaException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // for some Hive compatible systems
+ location = conf.get("table.original.path");
+ if (location != null) {
+ return location;
+ }
+
+ // read the input dir of this Hive job
+ //
+ // it is possible that input dir is the directory of a partition,
+ // so we should find the root of table by removing the partition
directory
+ location = conf.get(FileInputFormat.INPUT_DIR);
+ if (location != null) {
+ Path path = new Path(location);
+ int numPartitionKeys = partitionKeys.size();
+ if (numPartitionKeys > 0
+ &&
path.getName().startsWith(partitionKeys.get(numPartitionKeys - 1) + "=")) {
+ for (int i = 0; i < numPartitionKeys; i++) {
+ path = path.getParent();
+ }
+ }
+ if (path != null) {
+ try {
+ return getDnsPath(path, conf).toString();
+ } catch (MetaException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
- return propertiesLocation != null ? propertiesLocation : sdLocation;
+ return null;
}
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
index f1e4c6b8d..db49c32eb 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.utils.HiveUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
@@ -92,7 +93,7 @@ public class HiveSchema {
/** Extract {@link HiveSchema} from Hive serde properties. */
public static HiveSchema extract(@Nullable Configuration configuration,
Properties properties) {
- String location = LocationKeyExtractor.getLocation(properties);
+ String location =
LocationKeyExtractor.getPaimonLocation(configuration, properties);
Optional<TableSchema> tableSchema = getExistingSchema(configuration,
location);
String columnProperty =
properties.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS);
@@ -181,7 +182,7 @@ public class HiveSchema {
return Optional.empty();
}
Path path = new Path(location);
- Options options = PaimonJobConf.extractCatalogConfig(configuration);
+ Options options = HiveUtils.extractCatalogConfig(configuration);
options.set(CoreOptions.PATH, location);
CatalogContext context = CatalogContext.create(options, configuration);
try {
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonJobConf.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonJobConf.java
deleted file mode 100644
index 054ce565b..000000000
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonJobConf.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.hive;
-
-import org.apache.paimon.hive.mapred.PaimonOutputCommitter;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.utils.JsonSerdeUtil;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobConf;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Utility class to convert Hive table property keys and get file store
specific configurations from
- * {@link JobConf}.
- */
-public class PaimonJobConf {
-
- private static final String INTERNAL_LOCATION = "paimon.internal.location";
- private static final String INTERNAL_CATALOG_CONFIG =
"paimon.catalog.config";
-
- public static final String MAPRED_OUTPUT_COMMITTER =
"mapred.output.committer.class";
-
- public static final String PAIMON_WRITE = "paimon.write";
-
- private static final String PAIMON_PREFIX = "paimon.";
-
- private final JobConf jobConf;
-
- public PaimonJobConf(JobConf jobConf) {
- this.jobConf = jobConf;
- }
-
- public static void configureInputJobProperties(
- Configuration configuration, Properties properties, Map<String,
String> map) {
- map.put(
- INTERNAL_CATALOG_CONFIG,
-
JsonSerdeUtil.toJson(extractCatalogConfig(configuration).toMap()));
- map.put(INTERNAL_LOCATION,
LocationKeyExtractor.getLocation(properties));
- }
-
- public static void configureOutputJobProperties(
- Configuration configuration, Properties properties, Map<String,
String> map) {
- map.put(INTERNAL_LOCATION,
LocationKeyExtractor.getLocation(properties));
- map.put(MAPRED_OUTPUT_COMMITTER,
PaimonOutputCommitter.class.getName());
- map.put(PAIMON_WRITE, Boolean.TRUE.toString());
- properties.put(PAIMON_WRITE, Boolean.TRUE.toString());
- }
-
- public String getLocation() {
- return jobConf.get(INTERNAL_LOCATION);
- }
-
- /** Extract paimon catalog conf from Hive conf. */
- public static Options extractCatalogConfig(Configuration hiveConf) {
- Map<String, String> configMap = new HashMap<>();
-
- if (hiveConf != null) {
- for (Map.Entry<String, String> entry : hiveConf) {
- String name = entry.getKey();
- if (name.startsWith(PAIMON_PREFIX)) {
- String value = hiveConf.get(name);
- name = name.substring(PAIMON_PREFIX.length());
- configMap.put(name, value);
- }
- }
- }
- return Options.fromMap(configMap);
- }
-}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java
index 42cccd6a2..fe2ebfad1 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java
@@ -26,6 +26,7 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.hive.mapred.PaimonInputFormat;
import org.apache.paimon.hive.mapred.PaimonOutputFormat;
+import org.apache.paimon.hive.utils.HiveUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
@@ -77,7 +78,7 @@ public class PaimonMetaHook implements HiveMetaHook {
table.getSd().setInputFormat(PaimonInputFormat.class.getCanonicalName());
table.getSd().setOutputFormat(PaimonOutputFormat.class.getCanonicalName());
- String location = LocationKeyExtractor.getLocation(table, conf);
+ String location = LocationKeyExtractor.getPaimonLocation(conf, table);
if (location == null) {
String warehouse =
conf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname);
org.apache.hadoop.fs.Path hadoopPath =
@@ -130,7 +131,7 @@ public class PaimonMetaHook implements HiveMetaHook {
}
// we have created a paimon table, so we delete it to roll back;
- String location = LocationKeyExtractor.getLocation(table);
+ String location = LocationKeyExtractor.getPaimonLocation(conf, table);
Path path = new Path(location);
CatalogContext context = catalogContext(table, location);
@@ -157,7 +158,7 @@ public class PaimonMetaHook implements HiveMetaHook {
public void commitDropTable(Table table, boolean b) throws MetaException {}
private CatalogContext catalogContext(Table table, String location) {
- Options options = PaimonJobConf.extractCatalogConfig(conf);
+ Options options = HiveUtils.extractCatalogConfig(conf);
options.set(CoreOptions.PATH, location);
table.getParameters().forEach(options::set);
return CatalogContext.create(options, conf);
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonStorageHandler.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonStorageHandler.java
index 0a6396aeb..44b2828d8 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonStorageHandler.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonStorageHandler.java
@@ -40,12 +40,12 @@ import org.apache.hadoop.mapred.OutputFormat;
import java.util.Map;
import java.util.Properties;
-import static org.apache.paimon.hive.PaimonJobConf.MAPRED_OUTPUT_COMMITTER;
-import static org.apache.paimon.hive.PaimonJobConf.PAIMON_WRITE;
-
/** {@link HiveStorageHandler} for paimon. This is the entrance class of Hive
API. */
public class PaimonStorageHandler implements HiveStoragePredicateHandler,
HiveStorageHandler {
+ private static final String MAPRED_OUTPUT_COMMITTER =
"mapred.output.committer.class";
+ private static final String PAIMON_WRITE = "paimon.write";
+
private Configuration conf;
@Override
@@ -76,7 +76,9 @@ public class PaimonStorageHandler implements
HiveStoragePredicateHandler, HiveSt
@Override
public void configureInputJobProperties(TableDesc tableDesc, Map<String,
String> map) {
Properties properties = tableDesc.getProperties();
- PaimonJobConf.configureInputJobProperties(conf, properties, map);
+ map.put(
+ LocationKeyExtractor.INTERNAL_LOCATION,
+ LocationKeyExtractor.getPaimonLocation(conf, properties));
}
public void configureInputJobCredentials(TableDesc tableDesc, Map<String,
String> map) {}
@@ -84,7 +86,12 @@ public class PaimonStorageHandler implements
HiveStoragePredicateHandler, HiveSt
@Override
public void configureOutputJobProperties(TableDesc tableDesc, Map<String,
String> map) {
Properties properties = tableDesc.getProperties();
- PaimonJobConf.configureOutputJobProperties(conf, properties, map);
+ map.put(
+ LocationKeyExtractor.INTERNAL_LOCATION,
+ LocationKeyExtractor.getPaimonLocation(conf, properties));
+ map.put(MAPRED_OUTPUT_COMMITTER,
PaimonOutputCommitter.class.getName());
+ map.put(PAIMON_WRITE, Boolean.TRUE.toString());
+ properties.put(PAIMON_WRITE, Boolean.TRUE.toString());
}
@Override
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
index ef8b5b1b3..4de2bfc05 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
@@ -18,6 +18,7 @@
package org.apache.paimon.hive.mapred;
+import org.apache.paimon.hive.LocationKeyExtractor;
import org.apache.paimon.hive.RowDataContainer;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
@@ -27,7 +28,6 @@ import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.RowType;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.SerDeUtils;
@@ -56,16 +56,17 @@ public class PaimonInputFormat implements InputFormat<Void,
RowDataContainer> {
@Override
public InputSplit[] getSplits(JobConf jobConf, int numSplits) {
- // If the path of the Paimon table is moved from the location of the
Hive table to
- // properties, Hive will add a location for this table based on the
warehouse,
- // database, and table automatically.When querying by Hive, an
exception may occur
- // because the specified path for split for Paimon may not match the
location of Hive.
- // To work around this problem, we specify the path for split as the
location of Hive.
- String location =
jobConf.get(hive_metastoreConstants.META_TABLE_LOCATION);
-
FileStoreTable table = createFileStoreTable(jobConf);
InnerTableScan scan = table.newScan();
+ // If the path of the Paimon table is moved from the location of the
Hive table to
+ // properties (see HiveCatalogOptions#LOCATION_IN_PROPERTIES), Hive
will add a location for
+ // this table based on the warehouse, database, and table
automatically. When querying by
+ // Hive, an exception may occur because the specified path for split
for Paimon may not
+ // match the location of Hive. To work around this problem, we specify
the path for split as
+ // the location of Hive.
+ String location = LocationKeyExtractor.getMetastoreLocation(jobConf,
table.partitionKeys());
+
List<Predicate> predicates = new ArrayList<>();
String inputDir = jobConf.get(FileInputFormat.INPUT_DIR);
createPartitionPredicate(table.schema().logicalRowType(), location,
inputDir)
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveUtils.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveUtils.java
index de004d6c5..ec0b78deb 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveUtils.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveUtils.java
@@ -20,7 +20,7 @@ package org.apache.paimon.hive.utils;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.hive.PaimonJobConf;
+import org.apache.paimon.hive.LocationKeyExtractor;
import org.apache.paimon.hive.SearchArgumentToPredicateConverter;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
@@ -28,22 +28,26 @@ import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.mapred.JobConf;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
/** Utils for create {@link FileStoreTable} and {@link Predicate}. */
public class HiveUtils {
+ private static final String PAIMON_PREFIX = "paimon.";
+
public static FileStoreTable createFileStoreTable(JobConf jobConf) {
- PaimonJobConf wrapper = new PaimonJobConf(jobConf);
- Options options = PaimonJobConf.extractCatalogConfig(jobConf);
- options.set(CoreOptions.PATH, wrapper.getLocation());
+ Options options = extractCatalogConfig(jobConf);
+ options.set(CoreOptions.PATH,
LocationKeyExtractor.getPaimonLocation(jobConf));
CatalogContext catalogContext = CatalogContext.create(options,
jobConf);
return FileStoreTableFactory.create(catalogContext);
}
@@ -65,4 +69,21 @@ public class HiveUtils {
: null);
return converter.convert();
}
+
+ /** Extract paimon catalog conf from Hive conf. */
+ public static Options extractCatalogConfig(Configuration hiveConf) {
+ Map<String, String> configMap = new HashMap<>();
+
+ if (hiveConf != null) {
+ for (Map.Entry<String, String> entry : hiveConf) {
+ String name = entry.getKey();
+ if (name.startsWith(PAIMON_PREFIX)) {
+ String value = hiveConf.get(name);
+ name = name.substring(PAIMON_PREFIX.length());
+ configMap.put(name, value);
+ }
+ }
+ }
+ return Options.fromMap(configMap);
+ }
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/CreateTableITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/CreateTableITCase.java
index 6712dc0aa..af74d8667 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/CreateTableITCase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/CreateTableITCase.java
@@ -61,7 +61,7 @@ public class CreateTableITCase extends HiveTestBase {
assertThatThrownBy(() -> hiveShell.execute(hiveSql))
.hasRootCauseInstanceOf(IllegalArgumentException.class)
.hasRootCauseMessage(
- "Schema file not found in location "
+ "Schema file not found in location file:"
+ path
+ ". Please create table first.");
}