This is an automated email from the ASF dual-hosted git repository. yuqi4733 pushed a commit to branch internal-main in repository https://gitbox.apache.org/repos/asf/gravitino.git
commit c503e45a73694d0bab50731be647dfa0cba7925b Author: Yuhui <[email protected]> AuthorDate: Thu Dec 11 11:26:26 2025 +0800 [#9444] feat (hive-catalog): Implement the Hive shim class to access Hive2/3 (#9446) ### What changes were proposed in this pull request? Implement the Hive shim class to access Hive2/3 ### Why are the changes needed? Fix: #9444 ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? Add UTs --- .../java/org/apache/gravitino/hive/HiveColumn.java | 63 +++ .../java/org/apache/gravitino/hive/HiveTable.java | 35 ++ .../gravitino/hive/client/HiveClientFactory.java | 5 +- .../gravitino/hive/client/HiveClientImpl.java | 97 +++-- .../hive/client/HiveExceptionConverter.java | 153 ++++++- .../org/apache/gravitino/hive/client/HiveShim.java | 2 - .../apache/gravitino/hive/client/HiveShimV2.java | 280 +++++++++++++ .../apache/gravitino/hive/client/HiveShimV3.java | 463 +++++++++++++++++++++ .../org/apache/gravitino/hive/client/Util.java | 5 +- .../hive/converter/HiveDatabaseConverter.java | 94 +++++ .../hive/converter/HiveTableConverter.java | 307 +++++++++++++- .../hive/converter/TestHiveTableConverter.java | 63 +-- 12 files changed, 1456 insertions(+), 111 deletions(-) diff --git a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/HiveColumn.java b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/HiveColumn.java new file mode 100644 index 0000000000..69b97abd44 --- /dev/null +++ b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/HiveColumn.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.hive; + +import lombok.EqualsAndHashCode; +import org.apache.gravitino.connector.BaseColumn; + +/** Represents a column in an Apache Hive Metastore catalog. */ +@EqualsAndHashCode(callSuper = true) +public class HiveColumn extends BaseColumn { + + private HiveColumn() {} + + /** A builder class for constructing HiveColumn instances. */ + public static class Builder extends BaseColumnBuilder<Builder, HiveColumn> { + + /** Creates a new instance of {@link Builder}. */ + private Builder() {} + + /** + * Internal method to build a HiveColumn instance using the provided values. + * + * @return A new HiveColumn instance with the configured values. + */ + @Override + protected HiveColumn internalBuild() { + HiveColumn hiveColumn = new HiveColumn(); + + hiveColumn.name = name; + hiveColumn.comment = comment; + hiveColumn.dataType = dataType; + hiveColumn.nullable = nullable; + hiveColumn.autoIncrement = autoIncrement; + hiveColumn.defaultValue = defaultValue == null ? DEFAULT_VALUE_NOT_SET : defaultValue; + return hiveColumn; + } + } + + /** + * Creates a new instance of {@link Builder}. + * + * @return The new instance. + */ + public static Builder builder() { + return new Builder(); + } +} diff --git a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/HiveTable.java b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/HiveTable.java index 59ec54bfc4..b37485553d 100644 --- a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/HiveTable.java +++ b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/HiveTable.java @@ -22,13 +22,20 @@ import static org.apache.gravitino.catalog.hive.HiveConstants.COMMENT; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; import lombok.ToString; import org.apache.gravitino.catalog.hive.TableType; import org.apache.gravitino.connector.BaseTable; import org.apache.gravitino.connector.ProxyPlugin; import org.apache.gravitino.connector.TableOperations; +import org.apache.gravitino.rel.expressions.NamedReference; +import org.apache.gravitino.rel.expressions.transforms.Transform; +import org.apache.gravitino.rel.expressions.transforms.Transforms; /** Represents an Apache Hive Table entity in the Hive Metastore catalog. */ @ToString @@ -129,4 +136,32 @@ public class HiveTable extends BaseTable { public static Builder builder() { return new Builder(); } + + /** + * Returns all partition field names defined on this table in declaration order. Callers that need + * uniqueness can convert the returned list to a {@link java.util.Set}. + */ + public List<String> partitionFieldNames() { + if (partitioning() == null) { + return Collections.emptyList(); + } + + return Arrays.stream(partitioning()) + .flatMap(transform -> extractPartitionFieldNames(transform).stream()) + .collect(Collectors.toList()); + } + + private static List<String> extractPartitionFieldNames(Transform partitioning) { + if (partitioning instanceof Transforms.IdentityTransform) { + return Arrays.asList(((Transforms.IdentityTransform) partitioning).fieldName()); + } + + if (partitioning.arguments().length > 0 + && partitioning.arguments()[0] instanceof NamedReference.FieldReference fieldReference) { + return Arrays.asList(fieldReference.fieldName()); + } + + throw new IllegalArgumentException( + String.format("Unsupported partition transform type: %s", partitioning.getClass())); + } } diff --git a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientFactory.java b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientFactory.java index 1c74ca908f..4442125d9e 100644 --- a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientFactory.java +++ b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientFactory.java @@ -22,7 +22,7 @@ package org.apache.gravitino.hive.client; import static org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_URIS; import static org.apache.gravitino.hive.client.HiveClientClassLoader.HiveVersion.HIVE2; import static org.apache.gravitino.hive.client.HiveClientClassLoader.HiveVersion.HIVE3; -import static org.apache.gravitino.hive.client.Util.buildConfigurationFromProperties; +import static org.apache.gravitino.hive.client.Util.updateConfigurationFromProperties; import com.google.common.base.Preconditions; import java.lang.reflect.Constructor; @@ -58,7 +58,8 @@ public final class HiveClientFactory { this.properties = properties; try { - this.hadoopConf = buildConfigurationFromProperties(properties); + this.hadoopConf = new Configuration(); + updateConfigurationFromProperties(properties, hadoopConf); } catch (Exception e) { throw new RuntimeException("Failed to initialize HiveClientFactory", e); } diff --git a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientImpl.java b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientImpl.java index 310e6eb258..935f1c85c5 100644 --- a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientImpl.java +++ b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientImpl.java @@ -19,6 +19,7 @@ package org.apache.gravitino.hive.client; import java.util.List; +import java.util.Properties; import org.apache.gravitino.hive.HivePartition; import org.apache.gravitino.hive.HiveSchema; import org.apache.gravitino.hive.HiveTable; @@ -29,44 +30,72 @@ import org.apache.hadoop.security.UserGroupInformation; * partition operations. */ public class HiveClientImpl implements HiveClient { + + private final HiveShim shim; + + public HiveClientImpl(HiveClientClassLoader.HiveVersion hiveVersion, Properties properties) { + switch (hiveVersion) { + case HIVE2: + { + shim = new HiveShimV2(properties); + break; + } + case HIVE3: + { + shim = new HiveShimV3(properties); + break; + } + default: + throw new IllegalArgumentException("Unsupported Hive version: " + hiveVersion); + } + } + @Override - public void createDatabase(HiveSchema database) {} + public List<String> getAllDatabases(String catalogName) { + return shim.getAllDatabases(catalogName); + } @Override - public HiveSchema getDatabase(String catalogName, String databaseName) { - return null; + public void createDatabase(HiveSchema database) { + shim.createDatabase(database); } @Override - public List<String> getAllDatabases(String catalogName) { - return List.of(); + public HiveSchema getDatabase(String catalogName, String databaseName) { + return shim.getDatabase(catalogName, databaseName); } @Override - public void alterDatabase(String catalogName, String databaseName, HiveSchema database) {} + public void alterDatabase(String catalogName, String databaseName, HiveSchema database) { + shim.alterDatabase(catalogName, databaseName, database); + } @Override - public void dropDatabase(String catalogName, String databaseName, boolean cascade) {} + public void dropDatabase(String catalogName, String databaseName, boolean cascade) { + shim.dropDatabase(catalogName, databaseName, cascade); + } @Override public List<String> getAllTables(String catalogName, String databaseName) { - return List.of(); + return shim.getAllTables(catalogName, databaseName); } @Override public List<String> listTableNamesByFilter( String catalogName, String databaseName, String filter, short pageSize) { - return List.of(); + return shim.listTableNamesByFilter(catalogName, databaseName, filter, pageSize); } @Override public HiveTable getTable(String catalogName, String databaseName, String tableName) { - return null; + return shim.getTable(catalogName, databaseName, tableName); } @Override public void alterTable( - String catalogName, String databaseName, String tableName, HiveTable alteredHiveTable) {} + String catalogName, String databaseName, String tableName, HiveTable alteredHiveTable) { + shim.alterTable(catalogName, databaseName, tableName, alteredHiveTable); + } @Override public void dropTable( @@ -74,62 +103,82 @@ public class HiveClientImpl implements HiveClient { String databaseName, String tableName, boolean deleteData, - boolean ifPurge) {} + boolean ifPurge) { + shim.dropTable(catalogName, databaseName, tableName, deleteData, ifPurge); + } @Override - public void createTable(HiveTable hiveTable) {} + public void createTable(HiveTable hiveTable) { + shim.createTable(hiveTable); + } @Override public List<String> listPartitionNames(HiveTable table, short pageSize) { - return List.of(); + return shim.listPartitionNames(table, pageSize); } @Override public List<HivePartition> listPartitions(HiveTable table, short pageSize) { - return List.of(); + return shim.listPartitions(table, pageSize); } @Override public List<HivePartition> listPartitions( HiveTable table, List<String> filterPartitionValueList, short pageSize) { - return List.of(); + return shim.listPartitions(table, filterPartitionValueList, pageSize); } @Override public HivePartition getPartition(HiveTable table, String partitionName) { - return null; + return shim.getPartition(table, partitionName); } @Override public HivePartition addPartition(HiveTable table, HivePartition partition) { - return null; + return shim.addPartition(table, partition); } @Override public void dropPartition( - String catalogName, String databaseName, String tableName, String partitionName, boolean b) {} + String catalogName, + String databaseName, + String tableName, + String partitionName, + boolean deleteData) { + shim.dropPartition(catalogName, databaseName, tableName, partitionName, deleteData); + } @Override public String getDelegationToken(String finalPrincipalName, String userName) { - return ""; + return shim.getDelegationToken(finalPrincipalName, userName); } @Override public List<HiveTable> getTableObjectsByName( String catalogName, String databaseName, List<String> allTables) { - return List.of(); + return shim.getTableObjectsByName(catalogName, databaseName, allTables); } @Override public List<String> getCatalogs() { - return List.of(); + return shim.getCatalogs(); } @Override - public void close() {} + public void close() { + try { + shim.close(); + } catch (Exception e) { + throw new RuntimeException("Failed to close HiveClient", e); + } + } @Override public UserGroupInformation getUser() { - return null; + try { + return UserGroupInformation.getCurrentUser(); + } catch (Exception e) { + throw new RuntimeException("Failed to get current user", e); + } } } diff --git a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveExceptionConverter.java b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveExceptionConverter.java index 5e4c70885f..9371e9c8da 100644 --- a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveExceptionConverter.java +++ b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveExceptionConverter.java @@ -18,6 +18,22 @@ */ package org.apache.gravitino.hive.client; +import java.lang.reflect.InvocationTargetException; +import java.util.Locale; +import java.util.Set; +import java.util.stream.Stream; +import org.apache.gravitino.exceptions.AlreadyExistsException; +import org.apache.gravitino.exceptions.ConnectionFailedException; +import org.apache.gravitino.exceptions.GravitinoRuntimeException; +import org.apache.gravitino.exceptions.NoSuchEntityException; +import org.apache.gravitino.exceptions.NoSuchPartitionException; +import org.apache.gravitino.exceptions.NoSuchSchemaException; +import org.apache.gravitino.exceptions.NoSuchTableException; +import org.apache.gravitino.exceptions.NonEmptySchemaException; +import org.apache.gravitino.exceptions.PartitionAlreadyExistsException; +import org.apache.gravitino.exceptions.SchemaAlreadyExistsException; +import org.apache.gravitino.exceptions.TableAlreadyExistsException; + /** * Utility class to convert Hive exceptions to Gravitino exceptions. This class handles various * types of exceptions that can be thrown by Hive Metastore operations, including: @@ -32,7 +48,7 @@ package org.apache.gravitino.hive.client; */ public class HiveExceptionConverter { - enum TargetType { + private enum TargetType { TABLE, SCHEMA, PARTITION, @@ -79,6 +95,15 @@ public class HiveExceptionConverter { } } + private static final Set<String> NO_SUCH_EXCEPTION_SET = + Set.of( + "NoSuchObjectException", + "UnknownTableException", + "UnknownDBException", + "UnknownPartitionException", + "InvalidObjectException", + "InvalidPartitionException"); + private HiveExceptionConverter() {} /** @@ -89,6 +114,130 @@ public class HiveExceptionConverter { * @return A Gravitino exception */ public static RuntimeException toGravitinoException(Exception e, ExceptionTarget target) { - return null; + Throwable cause = unwrapException(e); + return convertException(cause, target); + } + + /** + * Unwraps nested exceptions, especially InvocationTargetException from reflection calls. + * + * @param e The exception to unwrap + * @return The unwrapped exception + */ + private static Throwable unwrapException(Exception e) { + Throwable cause = e; + if (e instanceof InvocationTargetException) { + InvocationTargetException ite = (InvocationTargetException) e; + cause = ite.getTargetException(); + if (cause == null) { + cause = e; + } + } + return cause; + } + + /** + * Converts the exception to the appropriate Gravitino exception based on its type. + * + * @param cause The exception cause + * @param target The target Hive object of the operation + * @return A Gravitino exception + */ + private static RuntimeException convertException(Throwable cause, ExceptionTarget target) { + if (cause instanceof RuntimeException && cause.getCause() instanceof Exception) { + return toGravitinoException((Exception) cause.getCause(), target); + } + + String message = cause.getMessage(); + String lowerMessage = message != null ? message.toLowerCase(Locale.ROOT) : ""; + String exceptionClassName = cause.getClass().getName(); + + if (exceptionClassName.contains("AlreadyExistsException")) { + return toAlreadyExistsException(cause, target, message); + } + + if (NO_SUCH_EXCEPTION_SET.contains(exceptionClassName)) { + return toNoSuchObjectException(cause, target, message); + } + + if (exceptionClassName.contains("InvalidOperationException")) { + if (isNonEmptySchemaMessage(lowerMessage)) { + return new NonEmptySchemaException( + cause, "Hive schema %s is not empty in Hive Metastore", target.name()); + } + return new IllegalArgumentException(cause.getMessage(), cause); + } + + if (exceptionClassName.contains("MetaException")) { + if (lowerMessage.contains("invalid partition key")) { + return new NoSuchPartitionException( + cause, "Hive partition %s does not exist in Hive Metastore", target.name()); + } + } + + if (exceptionClassName.contains("TException")) { + if (lowerMessage.contains("already exists")) { + return toAlreadyExistsException(cause, target, message); + } + if (isNotFoundKeyword(lowerMessage)) { + return toNoSuchObjectException(cause, target, message); + } + if (isNonEmptySchemaMessage(lowerMessage)) { + return new NonEmptySchemaException( + cause, "Hive schema %s is not empty in Hive Metastore", target.name()); + } + } + + if (isConnectionKeyword(lowerMessage) || exceptionClassName.contains("TransportException")) { + return new ConnectionFailedException( + cause, "Failed to connect to Hive Metastore: %s", target.name()); + } + + if (cause instanceof RuntimeException) { + return (RuntimeException) cause; + } + return new GravitinoRuntimeException(cause, message); + } + + private static boolean isNotFoundKeyword(String lowerMessage) { + return Stream.of("does not exist", "not found", "no such", "there is no") + .anyMatch(lowerMessage::contains); + } + + private static boolean isConnectionKeyword(String lowerMessage) { + return Stream.of("connection", "connect", "timeout", "network") + .anyMatch(lowerMessage::contains); + } + + private static boolean isNonEmptySchemaMessage(String lowerMessage) { + return (lowerMessage.contains("non-empty") || lowerMessage.contains("not empty")) + && (lowerMessage.contains("schema") || lowerMessage.contains("database")); + } + + private static RuntimeException toAlreadyExistsException( + Throwable cause, ExceptionTarget target, String rawMessage) { + TargetType objectType = target.type(); + return switch (objectType) { + case PARTITION -> new PartitionAlreadyExistsException( + cause, "Hive partition %s already exists in Hive Metastore", target.name()); + case TABLE -> new TableAlreadyExistsException( + cause, "Hive table %s already exists in Hive Metastore", target.name()); + case SCHEMA -> new SchemaAlreadyExistsException( + cause, "Hive schema %s already exists in Hive Metastore", target.name()); + default -> new AlreadyExistsException(cause, "%s", rawMessage); + }; + } + + private static RuntimeException toNoSuchObjectException( + Throwable cause, ExceptionTarget target, String rawMessage) { + return switch (target.type()) { + case PARTITION -> new NoSuchPartitionException( + cause, "Hive partition %s does not exist in Hive Metastore", target.name()); + case TABLE -> new NoSuchTableException( + cause, "Hive table %s does not exist in Hive Metastore", target.name()); + case SCHEMA -> new NoSuchSchemaException( + cause, "Hive schema %s does not exist in Hive Metastore", target.name()); + default -> new NoSuchEntityException(cause, "%s", rawMessage); + }; } } diff --git a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShim.java b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShim.java index d780354ef9..dc477129ee 100644 --- a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShim.java +++ b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShim.java @@ -37,8 +37,6 @@ public abstract class HiveShim { protected static final String RETRYING_META_STORE_CLIENT_CLASS = "org.apache.hadoop.hive.metastore.RetryingMetaStoreClient"; - protected static final String IMETA_STORE_CLIENT_CLASS = - "org.apache.hadoop.hive.metastore.IMetaStoreClient"; protected static final String HIVE_CONF_CLASS = "org.apache.hadoop.hive.conf.HiveConf"; protected static final String CONFIGURATION_CLASS = "org.apache.hadoop.conf.Configuration"; protected static final String METHOD_GET_PROXY = "getProxy"; diff --git a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShimV2.java b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShimV2.java new file mode 100644 index 0000000000..383f5a1d85 --- /dev/null +++ b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShimV2.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gravitino.hive.client; + +import static org.apache.gravitino.hive.client.HiveClientClassLoader.HiveVersion.HIVE2; +import static org.apache.gravitino.hive.client.Util.updateConfigurationFromProperties; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; +import java.util.List; +import java.util.Properties; +import org.apache.gravitino.hive.HivePartition; +import org.apache.gravitino.hive.HiveSchema; +import org.apache.gravitino.hive.HiveTable; +import org.apache.gravitino.hive.client.HiveExceptionConverter.ExceptionTarget; +import org.apache.gravitino.hive.converter.HiveDatabaseConverter; +import org.apache.gravitino.hive.converter.HiveTableConverter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Database; + +class HiveShimV2 extends HiveShim { + + HiveShimV2(Properties properties) { + super(HIVE2, properties); + } + + HiveShimV2(HiveClientClassLoader.HiveVersion version, Properties properties) { + super(version, properties); + } + + @Override + public IMetaStoreClient createMetaStoreClient(Properties properties) { + try { + ClassLoader classLoader = this.getClass().getClassLoader(); + Class<?> clientClass = classLoader.loadClass(RETRYING_META_STORE_CLIENT_CLASS); + Class<?> hiveConfClass = classLoader.loadClass(HIVE_CONF_CLASS); + Class<?> confClass = classLoader.loadClass(CONFIGURATION_CLASS); + + Object conf = confClass.getDeclaredConstructor().newInstance(); + updateConfigurationFromProperties(properties, (Configuration) conf); + + Constructor<?> hiveConfCtor = hiveConfClass.getConstructor(confClass, Class.class); + Object hiveConfInstance = hiveConfCtor.newInstance(conf, hiveConfClass); + + Method getProxyMethod = clientClass.getMethod(METHOD_GET_PROXY, hiveConfClass, boolean.class); + return (IMetaStoreClient) getProxyMethod.invoke(null, hiveConfInstance, false); + + } catch (Exception e) { + throw HiveExceptionConverter.toGravitinoException( + e, ExceptionTarget.other("MetaStoreClient")); + } + } + + @Override + public void createDatabase(HiveSchema database) { + try { + Database db = HiveDatabaseConverter.toHiveDb(database); + client.createDatabase(db); + } catch (Exception e) { + throw HiveExceptionConverter.toGravitinoException(e, ExceptionTarget.schema(database.name())); + } + } + + @Override + public HiveSchema getDatabase(String catalogName, String databaseName) { + try { + Database db = client.getDatabase(databaseName); + return HiveDatabaseConverter.fromHiveDB(db); + } catch (Exception e) { + throw HiveExceptionConverter.toGravitinoException(e, ExceptionTarget.schema(databaseName)); + } + } + + @Override + public void alterDatabase(String catalogName, String databaseName, HiveSchema database) { + try { + Database db = HiveDatabaseConverter.toHiveDb(database); + client.alterDatabase(databaseName, db); + } catch (Exception e) { + throw HiveExceptionConverter.toGravitinoException(e, ExceptionTarget.schema(databaseName)); + } + } + + @Override + public void dropDatabase(String catalogName, String databaseName, boolean cascade) { + try { + client.dropDatabase(databaseName, false, false, cascade); + } catch (Exception e) { + throw HiveExceptionConverter.toGravitinoException(e, ExceptionTarget.schema(databaseName)); + } + } + + @Override + public List<String> getAllTables(String catalogName, String databaseName) { + try { + return client.getAllTables(databaseName); + } catch (Exception e) { + throw HiveExceptionConverter.toGravitinoException(e, ExceptionTarget.schema(databaseName)); + } + } + + @Override + public List<String> listTableNamesByFilter( + String catalogName, String databaseName, String filter, short pageSize) { + try { + return client.listTableNamesByFilter(databaseName, filter, pageSize); + } catch (Exception e) { + throw HiveExceptionConverter.toGravitinoException(e, ExceptionTarget.schema(databaseName)); + } + } + + @Override + public HiveTable getTable(String catalogName, String databaseName, String tableName) { + try { + var tb = client.getTable(databaseName, tableName); + return HiveTableConverter.fromHiveTable(tb); + } catch (Exception e) { + throw HiveExceptionConverter.toGravitinoException(e, ExceptionTarget.table(tableName)); + } + } + + @Override + public void alterTable( + String catalogName, String databaseName, String tableName, HiveTable alteredHiveTable) { + try { + var tb = HiveTableConverter.toHiveTable(alteredHiveTable); + client.alter_table(databaseName, tableName, tb); + } catch (Exception e) { + throw HiveExceptionConverter.toGravitinoException(e, ExceptionTarget.table(tableName)); + } + } + + @Override + public void dropTable( + String catalogName, + String databaseName, + String tableName, + boolean deleteData, + boolean ifPurge) { + try { + client.dropTable(databaseName, tableName, deleteData, ifPurge); + } catch (Exception e) { + throw HiveExceptionConverter.toGravitinoException(e, ExceptionTarget.table(tableName)); + } + } + + @Override + public void createTable(HiveTable hiveTable) { + try { + var tb = HiveTableConverter.toHiveTable(hiveTable); + client.createTable(tb); + } catch (Exception e) { + throw HiveExceptionConverter.toGravitinoException(e, ExceptionTarget.table(hiveTable.name())); + } + } + + @Override + public List<String> listPartitionNames(HiveTable table, short pageSize) { + try { + String databaseName = table.databaseName(); + return client.listPartitionNames(databaseName, table.name(), pageSize); + } catch (Exception e) { + throw HiveExceptionConverter.toGravitinoException(e, ExceptionTarget.table(table.name())); + } + } + + @Override + public List<HivePartition> listPartitions(HiveTable table, short pageSize) { + try { + String databaseName = table.databaseName(); + var partitions = client.listPartitions(databaseName, table.name(), pageSize); + return partitions.stream().map(p -> HiveTableConverter.fromHivePartition(table, p)).toList(); + } catch (Exception e) { + throw HiveExceptionConverter.toGravitinoException(e, ExceptionTarget.table(table.name())); + } + } + + @Override + public List<HivePartition> listPartitions( + HiveTable table, List<String> filterPartitionValueList, short pageSize) { + try { + String databaseName = table.databaseName(); + var partitions = + client.listPartitions(databaseName, table.name(), filterPartitionValueList, pageSize); + return partitions.stream().map(p -> HiveTableConverter.fromHivePartition(table, p)).toList(); + } catch (Exception e) { + throw HiveExceptionConverter.toGravitinoException(e, ExceptionTarget.table(table.name())); + } + } + + @Override + public HivePartition getPartition(HiveTable table, String partitionName) { + try { + String databaseName = table.databaseName(); + var partitionValues = HivePartition.extractPartitionValues(partitionName); + var partition = client.getPartition(databaseName, table.name(), partitionValues); + return HiveTableConverter.fromHivePartition(table, partition); + } catch (Exception e) { + throw HiveExceptionConverter.toGravitinoException( + e, ExceptionTarget.partition(partitionName)); + } + } + + @Override + public HivePartition addPartition(HiveTable table, HivePartition partition) { + try { + String databaseName = table.databaseName(); + var hivePartition = HiveTableConverter.toHivePartition(databaseName, table, partition); + var addedPartition = client.add_partition(hivePartition); + return HiveTableConverter.fromHivePartition(table, addedPartition); + } catch (Exception e) { + throw HiveExceptionConverter.toGravitinoException( + e, ExceptionTarget.partition(partition.name())); + } + } + + @Override + public void dropPartition( + String catalogName, + String databaseName, + String tableName, + String partitionName, + boolean deleteData) { + try { + var partitionValues = HivePartition.extractPartitionValues(partitionName); + client.dropPartition(databaseName, tableName, partitionValues, deleteData); + } catch (Exception e) { + throw HiveExceptionConverter.toGravitinoException( + e, ExceptionTarget.partition(partitionName)); + } + } + + @Override + public String getDelegationToken(String finalPrincipalName, String userName) { + try { + return client.getDelegationToken(finalPrincipalName, userName); + } catch (Exception e) { + throw HiveExceptionConverter.toGravitinoException( + e, ExceptionTarget.other(finalPrincipalName)); + } + } + + @Override + public List<HiveTable> getTableObjectsByName( + String catalogName, String databaseName, List<String> allTables) { + try { + // Hive2 doesn't support catalog, so we ignore catalogName and use databaseName + var tables = client.getTableObjectsByName(databaseName, allTables); + return tables.stream().map(HiveTableConverter::fromHiveTable).toList(); + } catch (Exception e) { + throw HiveExceptionConverter.toGravitinoException(e, ExceptionTarget.schema(databaseName)); + } + } + + @Override + public List<String> getCatalogs() { + return List.of(); + } + + @Override + public void close() throws Exception { + client.close(); + } +} diff --git a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShimV3.java b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShimV3.java new file mode 100644 index 0000000000..6409696a41 --- /dev/null +++ b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShimV3.java @@ -0,0 +1,463 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gravitino.hive.client; + +import static org.apache.gravitino.hive.client.HiveClientClassLoader.HiveVersion.HIVE3; +import static org.apache.gravitino.hive.client.Util.updateConfigurationFromProperties; + +import java.lang.reflect.Method; +import java.util.List; +import java.util.Properties; +import org.apache.commons.lang3.reflect.MethodUtils; +import org.apache.gravitino.hive.HivePartition; +import org.apache.gravitino.hive.HiveSchema; +import org.apache.gravitino.hive.HiveTable; +import org.apache.gravitino.hive.client.HiveExceptionConverter.ExceptionTarget; +import org.apache.gravitino.hive.converter.HiveDatabaseConverter; +import org.apache.gravitino.hive.converter.HiveTableConverter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Table; + +class HiveShimV3 extends HiveShimV2 { + + private final Method createDatabaseMethod; + private final Method getDatabaseMethod; + private final Method getAllDatabasesMethod; + private final Method alterDatabaseMethod; + private final Method dropDatabaseMethod; + private final Method getTableMethod; + private final Method createTableMethod; + private final Method alterTableMethod; + private final Method dropTableMethod; + private final Method getAllTablesMethod; + private final Method listTableNamesByFilterMethod; + private final Method listPartitionNamesMethod; + private final Method listPartitionsMethod; + private final Method listPartitionsWithFilterMethod; + private final Method getPartitionMethod; + private final Method addPartitionMethod; + private final Method dropPartitionMethod; + private final Method getTableObjectsByNameMethod; + private final Method databaseSetCatalogNameMethod; + private final Method getCatalogsMethod; + private final Method tableSetCatalogNameMethod; + private final Method partitionSetCatalogNameMethod; + + HiveShimV3(Properties properties) { + super(HIVE3, properties); + try { + // Hive3 database methods with catalog support + this.createDatabaseMethod = + IMetaStoreClient.class.getMethod("createDatabase", Database.class); + this.getDatabaseMethod = + IMetaStoreClient.class.getMethod("getDatabase", String.class, String.class); + this.getAllDatabasesMethod = + IMetaStoreClient.class.getMethod("getAllDatabases", String.class); + this.alterDatabaseMethod = + IMetaStoreClient.class.getMethod( + "alterDatabase", String.class, String.class, Database.class); + this.dropDatabaseMethod = + IMetaStoreClient.class.getMethod( + "dropDatabase", + String.class, + String.class, + boolean.class, + boolean.class, + boolean.class); + + // Hive3 table methods with catalog support + this.getTableMethod = + IMetaStoreClient.class.getMethod("getTable", String.class, String.class, String.class); + this.createTableMethod = + IMetaStoreClient.class.getMethod( + "createTable", org.apache.hadoop.hive.metastore.api.Table.class); + this.alterTableMethod = + IMetaStoreClient.class.getMethod( + "alter_table", + String.class, + String.class, + String.class, + org.apache.hadoop.hive.metastore.api.Table.class); + this.dropTableMethod = + IMetaStoreClient.class.getMethod( + "dropTable", String.class, String.class, String.class, boolean.class, boolean.class); + this.getAllTablesMethod = + IMetaStoreClient.class.getMethod("getAllTables", String.class, String.class); + this.listTableNamesByFilterMethod = + IMetaStoreClient.class.getMethod( + "listTableNamesByFilter", String.class, String.class, String.class, int.class); + + // Hive3 partition methods with catalog support (using int for pageSize) + this.listPartitionNamesMethod = + IMetaStoreClient.class.getMethod( + "listPartitionNames", String.class, String.class, String.class, int.class); + this.listPartitionsMethod = + IMetaStoreClient.class.getMethod( + "listPartitions", String.class, String.class, String.class, int.class); + this.listPartitionsWithFilterMethod = + IMetaStoreClient.class.getMethod( + "listPartitions", String.class, String.class, String.class, List.class, int.class); + this.getPartitionMethod = + IMetaStoreClient.class.getMethod( + "getPartition", String.class, String.class, String.class, List.class); + this.addPartitionMethod = + IMetaStoreClient.class.getMethod( + "add_partition", org.apache.hadoop.hive.metastore.api.Partition.class); + this.dropPartitionMethod = + IMetaStoreClient.class.getMethod( + "dropPartition", String.class, String.class, String.class, List.class, boolean.class); + // Hive3 getTableObjectsByName with catalog parameter + this.getTableObjectsByNameMethod = + IMetaStoreClient.class.getMethod( + "getTableObjectsByName", String.class, String.class, List.class); + this.getCatalogsMethod = IMetaStoreClient.class.getMethod("getCatalogs"); + + // SetCatalogName methods for Hive3 + this.databaseSetCatalogNameMethod = + MethodUtils.getAccessibleMethod(Database.class, "setCatalogName", String.class); + this.tableSetCatalogNameMethod = + MethodUtils.getAccessibleMethod( + org.apache.hadoop.hive.metastore.api.Table.class, "setCatName", String.class); + this.partitionSetCatalogNameMethod = + MethodUtils.getAccessibleMethod( + org.apache.hadoop.hive.metastore.api.Partition.class, "setCatName", String.class); + + } catch (Exception e) { + throw HiveExceptionConverter.toGravitinoException(e, ExceptionTarget.other("HiveShimV3")); + } + } + + @Override + public IMetaStoreClient createMetaStoreClient(Properties properties) { + try { + ClassLoader classLoader = this.getClass().getClassLoader(); + Class<?> clientClass = classLoader.loadClass(RETRYING_META_STORE_CLIENT_CLASS); + Class<?> confClass = classLoader.loadClass(CONFIGURATION_CLASS); + + Object conf = confClass.getDeclaredConstructor().newInstance(); + updateConfigurationFromProperties(properties, (Configuration) conf); + + Method getProxyMethod = clientClass.getMethod(METHOD_GET_PROXY, confClass, boolean.class); + return (IMetaStoreClient) getProxyMethod.invoke(null, conf, false); + + } catch (Exception e) { + throw HiveExceptionConverter.toGravitinoException( + e, ExceptionTarget.other("MetaStoreClient")); + } + } + + @Override + public void createDatabase(HiveSchema database) { + Database db = HiveDatabaseConverter.toHiveDb(database); + String catalogName = database.catalogName(); + invoke(ExceptionTarget.other(""), db, databaseSetCatalogNameMethod, catalogName); + invoke(ExceptionTarget.schema(database.name()), client, createDatabaseMethod, db); + } + + @Override + public List<String> getAllDatabases(String catalogName) { + return (List<String>) + invoke(ExceptionTarget.catalog(catalogName), client, getAllDatabasesMethod, catalogName); + } + + @Override + public HiveSchema getDatabase(String catalogName, String databaseName) { + Database db = + (Database) + invoke( + ExceptionTarget.schema(databaseName), + client, + getDatabaseMethod, + catalogName, + databaseName); + return HiveDatabaseConverter.fromHiveDB(db); + } + + @Override + public void alterDatabase(String catalogName, String databaseName, HiveSchema database) { + Database db = HiveDatabaseConverter.toHiveDb(database); + invoke(ExceptionTarget.other(""), db, databaseSetCatalogNameMethod, catalogName); + invoke( + ExceptionTarget.schema(databaseName), + client, + alterDatabaseMethod, + catalogName, + databaseName, + db); + } + + @Override + public void dropDatabase(String catalogName, String databaseName, boolean cascade) { + invoke( + ExceptionTarget.schema(databaseName), + client, + dropDatabaseMethod, + catalogName, + databaseName, + true, + false, + cascade); + } + + @Override + public List<String> getAllTables(String catalogName, String databaseName) { + return (List<String>) + invoke( + ExceptionTarget.schema(databaseName), + client, + getAllTablesMethod, + catalogName, + databaseName); + } + + @Override + public List<String> listTableNamesByFilter( + String catalogName, String databaseName, String filter, short pageSize) { + Object pageSizeArg = convertPageSize(listTableNamesByFilterMethod, 3, pageSize); + return (List<String>) + invoke( + ExceptionTarget.schema(databaseName), + client, + listTableNamesByFilterMethod, + catalogName, + databaseName, + filter, + pageSizeArg); + } + + @Override + public HiveTable getTable(String catalogName, String databaseName, String tableName) { + var tb = + (org.apache.hadoop.hive.metastore.api.Table) + invoke( + ExceptionTarget.table(tableName), + client, + getTableMethod, + catalogName, + databaseName, + tableName); + return HiveTableConverter.fromHiveTable(tb); + } + + @Override + public void alterTable( + String catalogName, String databaseName, String tableName, HiveTable alteredHiveTable) { + var tb = HiveTableConverter.toHiveTable(alteredHiveTable); + invoke(ExceptionTarget.other(""), tb, tableSetCatalogNameMethod, catalogName); + invoke( + ExceptionTarget.table(tableName), + client, + alterTableMethod, + catalogName, + databaseName, + tableName, + tb); + } + + @Override + public void dropTable( + String catalogName, + String databaseName, + String tableName, + boolean deleteData, + boolean ifPurge) { + invoke( + ExceptionTarget.table(tableName), + client, + dropTableMethod, + catalogName, + databaseName, + tableName, + deleteData, + ifPurge); + } + + @Override + public void createTable(HiveTable hiveTable) { + String catalogName = hiveTable.catalogName(); + var tb = HiveTableConverter.toHiveTable(hiveTable); + invoke(ExceptionTarget.other(""), tb, tableSetCatalogNameMethod, catalogName); + invoke(ExceptionTarget.schema(hiveTable.name()), client, createTableMethod, tb); + } + + @Override + public List<String> listPartitionNames(HiveTable table, short pageSize) { + String catalogName = table.catalogName(); + String databaseName = table.databaseName(); + Object pageSizeArg = convertPageSize(listPartitionNamesMethod, 3, pageSize); + return (List<String>) + invoke( + ExceptionTarget.table(table.name()), + client, + listPartitionNamesMethod, + catalogName, + databaseName, + table.name(), + pageSizeArg); + } + + @Override + public List<HivePartition> listPartitions(HiveTable table, short pageSize) { + String catalogName = table.catalogName(); + String databaseName = table.databaseName(); + Object pageSizeArg = convertPageSize(listPartitionsMethod, 3, pageSize); + var partitions = + (List<org.apache.hadoop.hive.metastore.api.Partition>) + invoke( + ExceptionTarget.table(table.name()), + client, + listPartitionsMethod, + catalogName, + databaseName, + table.name(), + pageSizeArg); + return partitions.stream().map(p -> HiveTableConverter.fromHivePartition(table, p)).toList(); + } + + @Override + public List<HivePartition> listPartitions( + HiveTable table, List<String> filterPartitionValueList, short pageSize) { + String catalogName = table.catalogName(); + String databaseName = table.databaseName(); + Object pageSizeArg = convertPageSize(listPartitionsWithFilterMethod, 4, pageSize); + var partitions = + (List<org.apache.hadoop.hive.metastore.api.Partition>) + invoke( + ExceptionTarget.table(table.name()), + client, + listPartitionsWithFilterMethod, + catalogName, + databaseName, + table.name(), + filterPartitionValueList, + pageSizeArg); + return partitions.stream().map(p -> HiveTableConverter.fromHivePartition(table, p)).toList(); + } + + @Override + public HivePartition getPartition(HiveTable table, String partitionName) { + String catalogName = table.catalogName(); + String databaseName = table.databaseName(); + var partitionValues = HivePartition.extractPartitionValues(partitionName); + var partition = + (org.apache.hadoop.hive.metastore.api.Partition) + invoke( + ExceptionTarget.partition(partitionName), + client, + getPartitionMethod, + catalogName, + databaseName, + table.name(), + partitionValues); + return HiveTableConverter.fromHivePartition(table, partition); + } + + @Override + public HivePartition addPartition(HiveTable table, HivePartition partition) { + String catalogName = table.catalogName(); + String databaseName = table.databaseName(); + var hivePartition = HiveTableConverter.toHivePartition(databaseName, table, partition); + invoke(ExceptionTarget.other(""), hivePartition, partitionSetCatalogNameMethod, catalogName); + var addedPartition = + (org.apache.hadoop.hive.metastore.api.Partition) + invoke( + ExceptionTarget.partition(partition.name()), + client, + addPartitionMethod, + hivePartition); + return HiveTableConverter.fromHivePartition(table, addedPartition); + } + + @Override + public void dropPartition( + String catalogName, + String databaseName, + String tableName, + String partitionName, + boolean deleteData) { + var partitionValues = HivePartition.extractPartitionValues(partitionName); + invoke( + ExceptionTarget.partition(partitionName), + client, + dropPartitionMethod, + catalogName, + databaseName, + tableName, + partitionValues, + deleteData); + } + + @Override + public List<HiveTable> getTableObjectsByName( + String catalogName, String databaseName, List<String> allTables) { + var tables = + (List<Table>) + invoke( + ExceptionTarget.schema(databaseName), + client, + getTableObjectsByNameMethod, + catalogName, + databaseName, + allTables); + return tables.stream().map(HiveTableConverter::fromHiveTable).toList(); + } + + @Override + public List<String> getCatalogs() { + return (List<String>) invoke(ExceptionTarget.other(""), client, getCatalogsMethod); + } + + /** + * Invokes a method on an object and converts any exception to a Gravitino exception. + * + * @param target Hive object info used in error messages and exception mapping + * @param object The object to invoke the method on + * @param method The method to invoke + * @param args The arguments to pass to the method + * @return The result of the method invocation + */ + private Object invoke(ExceptionTarget target, Object object, Method method, Object... args) { + try { + return method.invoke(object, args); + } catch (Exception e) { + throw HiveExceptionConverter.toGravitinoException(e, target); + } + } + + /** + * Converts pageSize from short to int if the method parameter expects int type. + * + * @param method The method to check parameter types + * @param paramIndex The index of the pageSize parameter + * @param pageSize The pageSize value as short + * @return The pageSize as Object (short or int) + */ + private Object convertPageSize(Method method, int paramIndex, short pageSize) { + if (method.getParameterTypes()[paramIndex] == int.class) { + return (int) pageSize; + } + return pageSize; + } + + @Override + public void close() throws Exception { + client.close(); + } +} diff --git a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/Util.java b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/Util.java index fd16977363..a75b117142 100644 --- a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/Util.java +++ b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/Util.java @@ -27,9 +27,9 @@ public class Util { public static final String HIVE_CONFIG_RESOURCES = "hive.config.resources"; - public static Configuration buildConfigurationFromProperties(Properties properties) { + public static void updateConfigurationFromProperties( + Properties properties, Configuration config) { try { - Configuration config = new Configuration(); String configResources = properties.getProperty(HIVE_CONFIG_RESOURCES); if (StringUtils.isNotBlank(configResources)) { for (String resource : configResources.split(",")) { @@ -41,7 +41,6 @@ public class Util { } properties.forEach((k, v) -> config.set(k.toString(), v.toString())); - return config; } catch (Exception e) { throw new RuntimeException("Failed to create configuration", e); } diff --git a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/converter/HiveDatabaseConverter.java b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/converter/HiveDatabaseConverter.java new file mode 100644 index 0000000000..d3ba428161 --- /dev/null +++ b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/converter/HiveDatabaseConverter.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.hive.converter; + +import static org.apache.gravitino.catalog.hive.HiveConstants.LOCATION; + +import com.google.common.base.Preconditions; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import org.apache.gravitino.hive.HiveSchema; +import org.apache.gravitino.meta.AuditInfo; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.PrincipalType; + +public class HiveDatabaseConverter { + public static HiveSchema fromHiveDB(Database db) { + Preconditions.checkArgument(db != null, "Database cannot be null"); + + Map<String, String> properties = buildSchemaProperties(db); + + // Get audit info from Hive's Database object. Because Hive's database doesn't store create + // time, last modifier and last modified time, we only get creator from Hive's database. + AuditInfo.Builder auditInfoBuilder = AuditInfo.builder(); + Optional.ofNullable(db.getOwnerName()).ifPresent(auditInfoBuilder::withCreator); + + String catalogName = null; + try { + java.lang.reflect.Method getCatalogNameMethod = db.getClass().getMethod("getCatalogName"); + catalogName = (String) getCatalogNameMethod.invoke(db); + } catch (Exception e) { + // Hive2 doesn't have getCatalogName method, catalogName will be null + } + + HiveSchema hiveSchema = + HiveSchema.builder() + .withName(db.getName()) + .withComment(db.getDescription()) + .withProperties(properties) + .withAuditInfo(auditInfoBuilder.build()) + .withCatalogName(catalogName) + .build(); + return hiveSchema; + } + /** + * Add a comment on lines L57 to L65Add diff commentMarkdown input: edit mode + * selected.WritePreviewHeadingBoldItalicQuoteCodeLinkUnordered listNumbered listTask + * listMentionReferenceSaved repliesAdd FilesPaste, drop, or click to add filesCancelCommentStart + * a reviewReturn to code Build schema properties from a Hive Database object. + * + * @param database The Hive Database object. + * @return A map of schema properties. + */ + public static Map<String, String> buildSchemaProperties(Database database) { + Map<String, String> properties = new HashMap<>(database.getParameters()); + properties.put(LOCATION, database.getLocationUri()); + return properties; + } + + public static Database toHiveDb(HiveSchema hiveSchema) { + Preconditions.checkArgument(hiveSchema != null, "HiveSchema cannot be null"); + Database hiveDb = new Database(); + + hiveDb.setName(hiveSchema.name()); + Optional.ofNullable(hiveSchema.properties().get(LOCATION)).ifPresent(hiveDb::setLocationUri); + Optional.ofNullable(hiveSchema.comment()).ifPresent(hiveDb::setDescription); + + // TODO: Add more privilege info to Hive's Database object after Gravitino supports privilege. + hiveDb.setOwnerName(hiveSchema.auditInfo().creator()); + hiveDb.setOwnerType(PrincipalType.USER); + + Map<String, String> parameters = new HashMap<>(hiveSchema.properties()); + parameters.remove(LOCATION); + hiveDb.setParameters(parameters); + + return hiveDb; + } +} diff --git a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/converter/HiveTableConverter.java b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/converter/HiveTableConverter.java index f58080c85b..5c7079bf33 100644 --- a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/converter/HiveTableConverter.java +++ b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/converter/HiveTableConverter.java @@ -18,14 +18,32 @@ */ package org.apache.gravitino.hive.converter; +import static org.apache.gravitino.catalog.hive.HiveConstants.COMMENT; +import static org.apache.gravitino.catalog.hive.HiveConstants.EXTERNAL; +import static org.apache.gravitino.catalog.hive.HiveConstants.FORMAT; +import static org.apache.gravitino.catalog.hive.HiveConstants.INPUT_FORMAT; +import static org.apache.gravitino.catalog.hive.HiveConstants.LOCATION; +import static org.apache.gravitino.catalog.hive.HiveConstants.OUTPUT_FORMAT; +import static org.apache.gravitino.catalog.hive.HiveConstants.SERDE_LIB; +import static org.apache.gravitino.catalog.hive.HiveConstants.SERDE_NAME; +import static org.apache.gravitino.catalog.hive.HiveConstants.SERDE_PARAMETER_PREFIX; +import static org.apache.gravitino.catalog.hive.HiveConstants.TABLE_TYPE; import static org.apache.gravitino.rel.expressions.transforms.Transforms.identity; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.apache.gravitino.connector.BaseColumn; +import org.apache.gravitino.catalog.hive.StorageFormat; +import org.apache.gravitino.hive.HiveColumn; +import org.apache.gravitino.hive.HivePartition; +import org.apache.gravitino.hive.HiveTable; import org.apache.gravitino.meta.AuditInfo; import org.apache.gravitino.rel.Column; import org.apache.gravitino.rel.expressions.Expression; @@ -36,12 +54,221 @@ import org.apache.gravitino.rel.expressions.sorts.SortDirection; import org.apache.gravitino.rel.expressions.sorts.SortOrder; import org.apache.gravitino.rel.expressions.sorts.SortOrders; import org.apache.gravitino.rel.expressions.transforms.Transform; +import org.apache.gravitino.rel.types.Type; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; public class HiveTableConverter { - public static AuditInfo getAuditInfo(Table table) { + + public static HiveTable fromHiveTable(org.apache.hadoop.hive.metastore.api.Table table) { + Preconditions.checkArgument(table != null, "Table cannot be null"); + AuditInfo auditInfo = HiveTableConverter.getAuditInfo(table); + + Distribution distribution = HiveTableConverter.getDistribution(table); + + SortOrder[] sortOrders = HiveTableConverter.getSortOrders(table); + + Column[] columns = HiveTableConverter.getColumns(table); + + Transform[] partitioning = HiveTableConverter.getPartitioning(table); + + String catalogName = null; + try { + java.lang.reflect.Method getCatNameMethod = table.getClass().getMethod("getCatName"); + catalogName = (String) getCatNameMethod.invoke(table); + } catch (Exception e) { + // Hive2 doesn't have getCatName method, catalogName will be null + } + + HiveTable hiveTable = + HiveTable.builder() + .withName(table.getTableName()) + .withComment(table.getParameters().get(COMMENT)) + .withProperties(buildTableProperties(table)) + .withColumns(columns) + .withDistribution(distribution) + .withSortOrders(sortOrders) + .withAuditInfo(auditInfo) + .withPartitioning(partitioning) + .withCatalogName(catalogName) + .withDatabaseName(table.getDbName()) + .build(); + return hiveTable; + } + + private static Map<String, String> buildTableProperties( + org.apache.hadoop.hive.metastore.api.Table table) { + Map<String, String> properties = Maps.newHashMap(table.getParameters()); + + Optional.ofNullable(table.getTableType()).ifPresent(t -> properties.put(TABLE_TYPE, t)); + + StorageDescriptor sd = table.getSd(); + properties.put(LOCATION, sd.getLocation()); + properties.put(INPUT_FORMAT, sd.getInputFormat()); + properties.put(OUTPUT_FORMAT, sd.getOutputFormat()); + + SerDeInfo serdeInfo = sd.getSerdeInfo(); + Optional.ofNullable(serdeInfo.getName()).ifPresent(name -> properties.put(SERDE_NAME, name)); + Optional.ofNullable(serdeInfo.getSerializationLib()) + .ifPresent(lib -> properties.put(SERDE_LIB, lib)); + Optional.ofNullable(serdeInfo.getParameters()) + .ifPresent(p -> p.forEach((k, v) -> properties.put(SERDE_PARAMETER_PREFIX + k, v))); + + return properties; + } + + public static org.apache.hadoop.hive.metastore.api.Table toHiveTable(HiveTable hiveTable) { + Preconditions.checkArgument(hiveTable != null, "HiveTable cannot be null"); + String dbName = hiveTable.databaseName(); + Preconditions.checkArgument(dbName != null, "Database name cannot be null"); + + org.apache.hadoop.hive.metastore.api.Table table = + new org.apache.hadoop.hive.metastore.api.Table(); + + table.setTableName(hiveTable.name()); + table.setDbName(dbName); + String tableType = + hiveTable.properties().getOrDefault(TABLE_TYPE, String.valueOf(TableType.MANAGED_TABLE)); + table.setTableType(tableType.toUpperCase()); + + List<FieldSchema> partitionFields = + hiveTable.partitionFieldNames().stream() + .map(fieldName -> buildPartitionKeyField(fieldName, hiveTable)) + .collect(Collectors.toList()); + table.setSd(buildStorageDescriptor(hiveTable, partitionFields)); + table.setParameters(buildTableParameters(hiveTable)); + table.setPartitionKeys(partitionFields); + + // Set AuditInfo to Hive's Table object. Hive's Table doesn't support setting last modifier + // and last modified time, so we only set creator and create time. + table.setOwner(hiveTable.auditInfo().creator()); + table.setCreateTime(Math.toIntExact(hiveTable.auditInfo().createTime().getEpochSecond())); + + return table; + } + + private static FieldSchema buildPartitionKeyField(String fieldName, HiveTable table) { + Column partitionColumn = + Arrays.stream(table.columns()) + .filter(c -> c.name().equals(fieldName)) + .findFirst() + .orElseThrow( + () -> + new IllegalArgumentException( + String.format("Partition column %s does not exist", fieldName))); + return new FieldSchema( + partitionColumn.name(), + HiveDataTypeConverter.CONVERTER + .fromGravitino(partitionColumn.dataType()) + .getQualifiedName(), + partitionColumn.comment()); + } + + private static StorageDescriptor buildStorageDescriptor( + HiveTable table, List<FieldSchema> partitionFields) { + StorageDescriptor strgDesc = new StorageDescriptor(); + List<String> partitionKeys = + partitionFields.stream().map(FieldSchema::getName).collect(Collectors.toList()); + strgDesc.setCols( + Arrays.stream(table.columns()) + .filter(c -> !partitionKeys.contains(c.name())) + .map( + c -> + new FieldSchema( + c.name(), + HiveDataTypeConverter.CONVERTER + .fromGravitino(c.dataType()) + .getQualifiedName(), + c.comment())) + .collect(Collectors.toList())); + + // `location` must not be null, otherwise it will result in an NPE when calling HMS `alterTable` + // interface + Optional.ofNullable(table.properties().get(LOCATION)).ifPresent(strgDesc::setLocation); + + strgDesc.setSerdeInfo(buildSerDeInfo(table)); + StorageFormat storageFormat = + StorageFormat.valueOf( + table + .properties() + .getOrDefault(FORMAT, String.valueOf(StorageFormat.TEXTFILE)) + .toUpperCase()); + strgDesc.setInputFormat(storageFormat.getInputFormat()); + strgDesc.setOutputFormat(storageFormat.getOutputFormat()); + // Individually specified INPUT_FORMAT and OUTPUT_FORMAT can override the inputFormat and + // outputFormat of FORMAT + Optional.ofNullable(table.properties().get(INPUT_FORMAT)).ifPresent(strgDesc::setInputFormat); + Optional.ofNullable(table.properties().get(OUTPUT_FORMAT)).ifPresent(strgDesc::setOutputFormat); + + if (table.sortOrder() != null && table.sortOrder().length > 0) { + for (SortOrder sortOrder : table.sortOrder()) { + String columnName = ((NamedReference.FieldReference) sortOrder.expression()).fieldName()[0]; + strgDesc.addToSortCols( + new Order(columnName, sortOrder.direction() == SortDirection.ASCENDING ? 1 : 0)); + } + } + + if (table.distribution() != null && !Distributions.NONE.equals(table.distribution())) { + strgDesc.setBucketCols( + Arrays.stream(table.distribution().expressions()) + .map(t -> ((NamedReference.FieldReference) t).fieldName()[0]) + .collect(Collectors.toList())); + strgDesc.setNumBuckets(table.distribution().number()); + } + + return strgDesc; + } + + private static SerDeInfo buildSerDeInfo(HiveTable table) { + SerDeInfo serDeInfo = new SerDeInfo(); + serDeInfo.setName(table.properties().getOrDefault(SERDE_NAME, table.name())); + + StorageFormat storageFormat = + StorageFormat.valueOf( + table + .properties() + .getOrDefault(FORMAT, String.valueOf(StorageFormat.TEXTFILE)) + .toUpperCase()); + serDeInfo.setSerializationLib(storageFormat.getSerde()); + // Individually specified SERDE_LIB can override the serdeLib of FORMAT + Optional.ofNullable(table.properties().get(SERDE_LIB)) + .ifPresent(serDeInfo::setSerializationLib); + + table.properties().entrySet().stream() + .filter(e -> e.getKey().startsWith(SERDE_PARAMETER_PREFIX)) + .forEach( + e -> + serDeInfo.putToParameters( + e.getKey().substring(SERDE_PARAMETER_PREFIX.length()), e.getValue())); + return serDeInfo; + } + + private static Map<String, String> buildTableParameters(HiveTable table) { + Map<String, String> parameters = Maps.newHashMap(table.properties()); + Optional.ofNullable(table.comment()).ifPresent(c -> parameters.put(COMMENT, c)); + + if (TableType.EXTERNAL_TABLE.name().equalsIgnoreCase(table.properties().get(TABLE_TYPE))) { + parameters.put(EXTERNAL, "TRUE"); + } else { + parameters.put(EXTERNAL, "FALSE"); + } + + parameters.remove(LOCATION); + parameters.remove(TABLE_TYPE); + parameters.remove(INPUT_FORMAT); + parameters.remove(OUTPUT_FORMAT); + parameters.remove(SERDE_NAME); + parameters.remove(SERDE_LIB); + parameters.remove(FORMAT); + parameters.keySet().removeIf(k -> k.startsWith(SERDE_PARAMETER_PREFIX)); + return parameters; + } + + public static AuditInfo getAuditInfo(org.apache.hadoop.hive.metastore.api.Table table) { // Get audit info from Hive's Table object. Because Hive's table doesn't store last modifier // and last modified time, we only get creator and create time from Hive's table. AuditInfo.Builder auditInfoBuilder = AuditInfo.builder(); @@ -52,7 +279,7 @@ public class HiveTableConverter { return auditInfoBuilder.build(); } - public static Distribution getDistribution(Table table) { + public static Distribution getDistribution(org.apache.hadoop.hive.metastore.api.Table table) { StorageDescriptor sd = table.getSd(); Distribution distribution = Distributions.NONE; if (sd.getBucketCols() != null && !sd.getBucketCols().isEmpty()) { @@ -65,7 +292,7 @@ public class HiveTableConverter { return distribution; } - public static SortOrder[] getSortOrders(Table table) { + public static SortOrder[] getSortOrders(org.apache.hadoop.hive.metastore.api.Table table) { SortOrder[] sortOrders = SortOrders.NONE; StorageDescriptor sd = table.getSd(); if (sd.getSortCols() != null && !sd.getSortCols().isEmpty()) { @@ -81,15 +308,13 @@ public class HiveTableConverter { return sortOrders; } - public static Transform[] getPartitioning(Table table) { + public static Transform[] getPartitioning(org.apache.hadoop.hive.metastore.api.Table table) { return table.getPartitionKeys().stream() .map(p -> identity(p.getName())) .toArray(Transform[]::new); } - public static < - BUILDER extends BaseColumn.BaseColumnBuilder<BUILDER, COLUMN>, COLUMN extends BaseColumn> - Column[] getColumns(Table table, BUILDER columnBuilder) { + public static Column[] getColumns(org.apache.hadoop.hive.metastore.api.Table table) { StorageDescriptor sd = table.getSd(); // Collect column names from sd.getCols() to check for duplicates Set<String> columnNames = @@ -99,21 +324,65 @@ public class HiveTableConverter { sd.getCols().stream() .map( f -> - columnBuilder - .withName(f.getName()) - .withType(HiveDataTypeConverter.CONVERTER.toGravitino(f.getType())) - .withComment(f.getComment()) - .build()), + buildColumn( + f.getName(), + HiveDataTypeConverter.CONVERTER.toGravitino(f.getType()), + f.getComment())), table.getPartitionKeys().stream() // Filter out partition keys that already exist in sd.getCols() .filter(p -> !columnNames.contains(p.getName())) .map( p -> - columnBuilder - .withName(p.getName()) - .withType(HiveDataTypeConverter.CONVERTER.toGravitino(p.getType())) - .withComment(p.getComment()) - .build())) + buildColumn( + p.getName(), + HiveDataTypeConverter.CONVERTER.toGravitino(p.getType()), + p.getComment()))) .toArray(Column[]::new); } + + private static Column buildColumn(String name, Type type, String comment) { + HiveColumn.Builder builder = + HiveColumn.builder().withName(name).withType(type).withNullable(true); + if (comment != null) { + builder.withComment(comment); + } + return builder.build(); + } + + public static HivePartition fromHivePartition( + HiveTable table, org.apache.hadoop.hive.metastore.api.Partition partition) { + Preconditions.checkArgument(table != null, "Table cannot be null"); + Preconditions.checkArgument(partition != null, "Partition cannot be null"); + List<String> partitionColumns = table.partitionFieldNames(); + String partitionName = FileUtils.makePartName(partitionColumns, partition.getValues()); + // todo: support partition properties metadata to get more necessary information + return HivePartition.identity(partitionName, partition.getParameters()); + } + + public static org.apache.hadoop.hive.metastore.api.Partition toHivePartition( + String dbName, HiveTable table, HivePartition partition) { + Preconditions.checkArgument(dbName != null, "Database name cannot be null"); + Preconditions.checkArgument(table != null, "Table cannot be null"); + Preconditions.checkArgument(partition != null, "Partition cannot be null"); + org.apache.hadoop.hive.metastore.api.Partition hivePartition = + new org.apache.hadoop.hive.metastore.api.Partition(); + hivePartition.setDbName(dbName); + hivePartition.setTableName(table.name()); + + List<FieldSchema> partitionFields = + table.partitionFieldNames().stream() + .map(fieldName -> buildPartitionKeyField(fieldName, table)) + .collect(Collectors.toList()); + // todo: support custom serde and location if necessary + StorageDescriptor sd = buildStorageDescriptor(table, partitionFields); + // The location will be automatically generated by Hive Metastore + sd.setLocation(null); + + hivePartition.setSd(sd); + hivePartition.setParameters(partition.properties()); + + List<String> values = HivePartition.extractPartitionValues(partition.name()); + hivePartition.setValues(values); + return hivePartition; + } } diff --git a/catalogs/hive-metastore-common/src/test/java/org/apache/gravitino/hive/converter/TestHiveTableConverter.java b/catalogs/hive-metastore-common/src/test/java/org/apache/gravitino/hive/converter/TestHiveTableConverter.java index f0882e35d9..3b9945807b 100644 --- a/catalogs/hive-metastore-common/src/test/java/org/apache/gravitino/hive/converter/TestHiveTableConverter.java +++ b/catalogs/hive-metastore-common/src/test/java/org/apache/gravitino/hive/converter/TestHiveTableConverter.java @@ -22,7 +22,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import java.util.Arrays; import java.util.List; -import org.apache.gravitino.connector.BaseColumn; import org.apache.gravitino.rel.Column; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; @@ -53,9 +52,7 @@ public class TestHiveTableConverter { new FieldSchema("month", "int", "Month partition")); table.setPartitionKeys(partitionKeys); - // Get columns using a test column builder - TestColumnBuilder builder = new TestColumnBuilder(); - Column[] columns = HiveTableConverter.getColumns(table, builder); + Column[] columns = HiveTableConverter.getColumns(table); // Should have all 5 columns (3 regular + 2 partition) assertEquals(5, columns.length); @@ -90,9 +87,7 @@ public class TestHiveTableConverter { ); table.setPartitionKeys(partitionKeys); - // Get columns using a test column builder - TestColumnBuilder builder = new TestColumnBuilder(); - Column[] columns = HiveTableConverter.getColumns(table, builder); + Column[] columns = HiveTableConverter.getColumns(table); // Should have only 4 columns (3 regular + 1 unique partition) // The duplicates (date and name) should not be added again @@ -126,9 +121,7 @@ public class TestHiveTableConverter { new FieldSchema("col3", "double", "Column 3 partition")); table.setPartitionKeys(partitionKeys); - // Get columns using a test column builder - TestColumnBuilder builder = new TestColumnBuilder(); - Column[] columns = HiveTableConverter.getColumns(table, builder); + Column[] columns = HiveTableConverter.getColumns(table); // Should have only 3 columns (all from regular columns, no duplicates from partition keys) assertEquals(3, columns.length); @@ -154,59 +147,11 @@ public class TestHiveTableConverter { // Empty partition keys table.setPartitionKeys(Arrays.asList()); - // Get columns using a test column builder - TestColumnBuilder builder = new TestColumnBuilder(); - Column[] columns = HiveTableConverter.getColumns(table, builder); + Column[] columns = HiveTableConverter.getColumns(table); // Should have only 2 columns from regular columns assertEquals(2, columns.length); assertEquals("id", columns[0].name()); assertEquals("name", columns[1].name()); } - - // Test column implementation for testing - static class TestColumn extends BaseColumn { - private TestColumn() {} - - static class Builder extends BaseColumn.BaseColumnBuilder<Builder, TestColumn> { - @Override - protected TestColumn internalBuild() { - TestColumn column = new TestColumn(); - // Use reflection to set protected fields - try { - java.lang.reflect.Field nameField = BaseColumn.class.getDeclaredField("name"); - nameField.setAccessible(true); - nameField.set(column, name); - - java.lang.reflect.Field commentField = BaseColumn.class.getDeclaredField("comment"); - commentField.setAccessible(true); - commentField.set(column, comment); - - java.lang.reflect.Field dataTypeField = BaseColumn.class.getDeclaredField("dataType"); - dataTypeField.setAccessible(true); - dataTypeField.set(column, dataType); - - java.lang.reflect.Field nullableField = BaseColumn.class.getDeclaredField("nullable"); - nullableField.setAccessible(true); - nullableField.set(column, nullable); - - java.lang.reflect.Field autoIncrementField = - BaseColumn.class.getDeclaredField("autoIncrement"); - autoIncrementField.setAccessible(true); - autoIncrementField.set(column, autoIncrement); - - java.lang.reflect.Field defaultValueField = - BaseColumn.class.getDeclaredField("defaultValue"); - defaultValueField.setAccessible(true); - defaultValueField.set(column, defaultValue); - } catch (Exception e) { - throw new RuntimeException("Failed to build TestColumn", e); - } - return column; - } - } - } - - // Test column builder helper - static class TestColumnBuilder extends TestColumn.Builder {} }
