This is an automated email from the ASF dual-hosted git repository.

yuqi4733 pushed a commit to branch internal-main
in repository https://gitbox.apache.org/repos/asf/gravitino.git

commit c503e45a73694d0bab50731be647dfa0cba7925b
Author: Yuhui <[email protected]>
AuthorDate: Thu Dec 11 11:26:26 2025 +0800

    [#9444] feat (hive-catalog): Implement the Hive shim class to access 
Hive2/3 (#9446)
    
    ### What changes were proposed in this pull request?
    
    Implement the Hive shim class to access Hive2/3
    
    ### Why are the changes needed?
    
    Fix: #9444
    
    ### Does this PR introduce _any_ user-facing change?
    
    NO
    
    ### How was this patch tested?
    
    Add UTs
---
 .../java/org/apache/gravitino/hive/HiveColumn.java |  63 +++
 .../java/org/apache/gravitino/hive/HiveTable.java  |  35 ++
 .../gravitino/hive/client/HiveClientFactory.java   |   5 +-
 .../gravitino/hive/client/HiveClientImpl.java      |  97 +++--
 .../hive/client/HiveExceptionConverter.java        | 153 ++++++-
 .../org/apache/gravitino/hive/client/HiveShim.java |   2 -
 .../apache/gravitino/hive/client/HiveShimV2.java   | 280 +++++++++++++
 .../apache/gravitino/hive/client/HiveShimV3.java   | 463 +++++++++++++++++++++
 .../org/apache/gravitino/hive/client/Util.java     |   5 +-
 .../hive/converter/HiveDatabaseConverter.java      |  94 +++++
 .../hive/converter/HiveTableConverter.java         | 307 +++++++++++++-
 .../hive/converter/TestHiveTableConverter.java     |  63 +--
 12 files changed, 1456 insertions(+), 111 deletions(-)

diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/HiveColumn.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/HiveColumn.java
new file mode 100644
index 0000000000..69b97abd44
--- /dev/null
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/HiveColumn.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hive;
+
+import lombok.EqualsAndHashCode;
+import org.apache.gravitino.connector.BaseColumn;
+
+/** Represents a column in an Apache Hive Metastore catalog. */
+@EqualsAndHashCode(callSuper = true)
+public class HiveColumn extends BaseColumn {
+
+  private HiveColumn() {}
+
+  /** A builder class for constructing HiveColumn instances. */
+  public static class Builder extends BaseColumnBuilder<Builder, HiveColumn> {
+
+    /** Creates a new instance of {@link Builder}. */
+    private Builder() {}
+
+    /**
+     * Internal method to build a HiveColumn instance using the provided 
values.
+     *
+     * @return A new HiveColumn instance with the configured values.
+     */
+    @Override
+    protected HiveColumn internalBuild() {
+      HiveColumn hiveColumn = new HiveColumn();
+
+      hiveColumn.name = name;
+      hiveColumn.comment = comment;
+      hiveColumn.dataType = dataType;
+      hiveColumn.nullable = nullable;
+      hiveColumn.autoIncrement = autoIncrement;
+      hiveColumn.defaultValue = defaultValue == null ? DEFAULT_VALUE_NOT_SET : 
defaultValue;
+      return hiveColumn;
+    }
+  }
+
+  /**
+   * Creates a new instance of {@link Builder}.
+   *
+   * @return The new instance.
+   */
+  public static Builder builder() {
+    return new Builder();
+  }
+}
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/HiveTable.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/HiveTable.java
index 59ec54bfc4..b37485553d 100644
--- 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/HiveTable.java
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/HiveTable.java
@@ -22,13 +22,20 @@ import static 
org.apache.gravitino.catalog.hive.HiveConstants.COMMENT;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
 import lombok.ToString;
 import org.apache.gravitino.catalog.hive.TableType;
 import org.apache.gravitino.connector.BaseTable;
 import org.apache.gravitino.connector.ProxyPlugin;
 import org.apache.gravitino.connector.TableOperations;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
 
 /** Represents an Apache Hive Table entity in the Hive Metastore catalog. */
 @ToString
@@ -129,4 +136,32 @@ public class HiveTable extends BaseTable {
   public static Builder builder() {
     return new Builder();
   }
+
+  /**
+   * Returns all partition field names defined on this table in declaration 
order. Callers that need
+   * uniqueness can convert the returned list to a {@link java.util.Set}.
+   */
+  public List<String> partitionFieldNames() {
+    if (partitioning() == null) {
+      return Collections.emptyList();
+    }
+
+    return Arrays.stream(partitioning())
+        .flatMap(transform -> extractPartitionFieldNames(transform).stream())
+        .collect(Collectors.toList());
+  }
+
+  private static List<String> extractPartitionFieldNames(Transform 
partitioning) {
+    if (partitioning instanceof Transforms.IdentityTransform) {
+      return Arrays.asList(((Transforms.IdentityTransform) 
partitioning).fieldName());
+    }
+
+    if (partitioning.arguments().length > 0
+        && partitioning.arguments()[0] instanceof 
NamedReference.FieldReference fieldReference) {
+      return Arrays.asList(fieldReference.fieldName());
+    }
+
+    throw new IllegalArgumentException(
+        String.format("Unsupported partition transform type: %s", 
partitioning.getClass()));
+  }
 }
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientFactory.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientFactory.java
index 1c74ca908f..4442125d9e 100644
--- 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientFactory.java
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientFactory.java
@@ -22,7 +22,7 @@ package org.apache.gravitino.hive.client;
 import static 
org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_URIS;
 import static 
org.apache.gravitino.hive.client.HiveClientClassLoader.HiveVersion.HIVE2;
 import static 
org.apache.gravitino.hive.client.HiveClientClassLoader.HiveVersion.HIVE3;
-import static 
org.apache.gravitino.hive.client.Util.buildConfigurationFromProperties;
+import static 
org.apache.gravitino.hive.client.Util.updateConfigurationFromProperties;
 
 import com.google.common.base.Preconditions;
 import java.lang.reflect.Constructor;
@@ -58,7 +58,8 @@ public final class HiveClientFactory {
     this.properties = properties;
 
     try {
-      this.hadoopConf = buildConfigurationFromProperties(properties);
+      this.hadoopConf = new Configuration();
+      updateConfigurationFromProperties(properties, hadoopConf);
     } catch (Exception e) {
       throw new RuntimeException("Failed to initialize HiveClientFactory", e);
     }
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientImpl.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientImpl.java
index 310e6eb258..935f1c85c5 100644
--- 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientImpl.java
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientImpl.java
@@ -19,6 +19,7 @@
 package org.apache.gravitino.hive.client;
 
 import java.util.List;
+import java.util.Properties;
 import org.apache.gravitino.hive.HivePartition;
 import org.apache.gravitino.hive.HiveSchema;
 import org.apache.gravitino.hive.HiveTable;
@@ -29,44 +30,72 @@ import org.apache.hadoop.security.UserGroupInformation;
  * partition operations.
  */
 public class HiveClientImpl implements HiveClient {
+
+  private final HiveShim shim;
+
+  public HiveClientImpl(HiveClientClassLoader.HiveVersion hiveVersion, 
Properties properties) {
+    switch (hiveVersion) {
+      case HIVE2:
+        {
+          shim = new HiveShimV2(properties);
+          break;
+        }
+      case HIVE3:
+        {
+          shim = new HiveShimV3(properties);
+          break;
+        }
+      default:
+        throw new IllegalArgumentException("Unsupported Hive version: " + 
hiveVersion);
+    }
+  }
+
   @Override
-  public void createDatabase(HiveSchema database) {}
+  public List<String> getAllDatabases(String catalogName) {
+    return shim.getAllDatabases(catalogName);
+  }
 
   @Override
-  public HiveSchema getDatabase(String catalogName, String databaseName) {
-    return null;
+  public void createDatabase(HiveSchema database) {
+    shim.createDatabase(database);
   }
 
   @Override
-  public List<String> getAllDatabases(String catalogName) {
-    return List.of();
+  public HiveSchema getDatabase(String catalogName, String databaseName) {
+    return shim.getDatabase(catalogName, databaseName);
   }
 
   @Override
-  public void alterDatabase(String catalogName, String databaseName, 
HiveSchema database) {}
+  public void alterDatabase(String catalogName, String databaseName, 
HiveSchema database) {
+    shim.alterDatabase(catalogName, databaseName, database);
+  }
 
   @Override
-  public void dropDatabase(String catalogName, String databaseName, boolean 
cascade) {}
+  public void dropDatabase(String catalogName, String databaseName, boolean 
cascade) {
+    shim.dropDatabase(catalogName, databaseName, cascade);
+  }
 
   @Override
   public List<String> getAllTables(String catalogName, String databaseName) {
-    return List.of();
+    return shim.getAllTables(catalogName, databaseName);
   }
 
   @Override
   public List<String> listTableNamesByFilter(
       String catalogName, String databaseName, String filter, short pageSize) {
-    return List.of();
+    return shim.listTableNamesByFilter(catalogName, databaseName, filter, 
pageSize);
   }
 
   @Override
   public HiveTable getTable(String catalogName, String databaseName, String 
tableName) {
-    return null;
+    return shim.getTable(catalogName, databaseName, tableName);
   }
 
   @Override
   public void alterTable(
-      String catalogName, String databaseName, String tableName, HiveTable 
alteredHiveTable) {}
+      String catalogName, String databaseName, String tableName, HiveTable 
alteredHiveTable) {
+    shim.alterTable(catalogName, databaseName, tableName, alteredHiveTable);
+  }
 
   @Override
   public void dropTable(
@@ -74,62 +103,82 @@ public class HiveClientImpl implements HiveClient {
       String databaseName,
       String tableName,
       boolean deleteData,
-      boolean ifPurge) {}
+      boolean ifPurge) {
+    shim.dropTable(catalogName, databaseName, tableName, deleteData, ifPurge);
+  }
 
   @Override
-  public void createTable(HiveTable hiveTable) {}
+  public void createTable(HiveTable hiveTable) {
+    shim.createTable(hiveTable);
+  }
 
   @Override
   public List<String> listPartitionNames(HiveTable table, short pageSize) {
-    return List.of();
+    return shim.listPartitionNames(table, pageSize);
   }
 
   @Override
   public List<HivePartition> listPartitions(HiveTable table, short pageSize) {
-    return List.of();
+    return shim.listPartitions(table, pageSize);
   }
 
   @Override
   public List<HivePartition> listPartitions(
       HiveTable table, List<String> filterPartitionValueList, short pageSize) {
-    return List.of();
+    return shim.listPartitions(table, filterPartitionValueList, pageSize);
   }
 
   @Override
   public HivePartition getPartition(HiveTable table, String partitionName) {
-    return null;
+    return shim.getPartition(table, partitionName);
   }
 
   @Override
   public HivePartition addPartition(HiveTable table, HivePartition partition) {
-    return null;
+    return shim.addPartition(table, partition);
   }
 
   @Override
   public void dropPartition(
-      String catalogName, String databaseName, String tableName, String 
partitionName, boolean b) {}
+      String catalogName,
+      String databaseName,
+      String tableName,
+      String partitionName,
+      boolean deleteData) {
+    shim.dropPartition(catalogName, databaseName, tableName, partitionName, 
deleteData);
+  }
 
   @Override
   public String getDelegationToken(String finalPrincipalName, String userName) 
{
-    return "";
+    return shim.getDelegationToken(finalPrincipalName, userName);
   }
 
   @Override
   public List<HiveTable> getTableObjectsByName(
       String catalogName, String databaseName, List<String> allTables) {
-    return List.of();
+    return shim.getTableObjectsByName(catalogName, databaseName, allTables);
   }
 
   @Override
   public List<String> getCatalogs() {
-    return List.of();
+    return shim.getCatalogs();
   }
 
   @Override
-  public void close() {}
+  public void close() {
+    try {
+      shim.close();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to close HiveClient", e);
+    }
+  }
 
   @Override
   public UserGroupInformation getUser() {
-    return null;
+    try {
+      return UserGroupInformation.getCurrentUser();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to get current user", e);
+    }
   }
 }
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveExceptionConverter.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveExceptionConverter.java
index 5e4c70885f..9371e9c8da 100644
--- 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveExceptionConverter.java
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveExceptionConverter.java
@@ -18,6 +18,22 @@
  */
 package org.apache.gravitino.hive.client;
 
+import java.lang.reflect.InvocationTargetException;
+import java.util.Locale;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.gravitino.exceptions.AlreadyExistsException;
+import org.apache.gravitino.exceptions.ConnectionFailedException;
+import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.exceptions.NoSuchPartitionException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.NonEmptySchemaException;
+import org.apache.gravitino.exceptions.PartitionAlreadyExistsException;
+import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+
 /**
  * Utility class to convert Hive exceptions to Gravitino exceptions. This 
class handles various
  * types of exceptions that can be thrown by Hive Metastore operations, 
including:
@@ -32,7 +48,7 @@ package org.apache.gravitino.hive.client;
  */
 public class HiveExceptionConverter {
 
-  enum TargetType {
+  private enum TargetType {
     TABLE,
     SCHEMA,
     PARTITION,
@@ -79,6 +95,15 @@ public class HiveExceptionConverter {
     }
   }
 
+  private static final Set<String> NO_SUCH_EXCEPTION_SET =
+      Set.of(
+          "NoSuchObjectException",
+          "UnknownTableException",
+          "UnknownDBException",
+          "UnknownPartitionException",
+          "InvalidObjectException",
+          "InvalidPartitionException");
+
   private HiveExceptionConverter() {}
 
   /**
@@ -89,6 +114,130 @@ public class HiveExceptionConverter {
    * @return A Gravitino exception
    */
   public static RuntimeException toGravitinoException(Exception e, 
ExceptionTarget target) {
-    return null;
+    Throwable cause = unwrapException(e);
+    return convertException(cause, target);
+  }
+
+  /**
+   * Unwraps nested exceptions, especially InvocationTargetException from 
reflection calls.
+   *
+   * @param e The exception to unwrap
+   * @return The unwrapped exception
+   */
+  private static Throwable unwrapException(Exception e) {
+    Throwable cause = e;
+    if (e instanceof InvocationTargetException) {
+      InvocationTargetException ite = (InvocationTargetException) e;
+      cause = ite.getTargetException();
+      if (cause == null) {
+        cause = e;
+      }
+    }
+    return cause;
+  }
+
+  /**
+   * Converts the exception to the appropriate Gravitino exception based on 
its type.
+   *
+   * @param cause The exception cause
+   * @param target The target Hive object of the operation
+   * @return A Gravitino exception
+   */
+  private static RuntimeException convertException(Throwable cause, 
ExceptionTarget target) {
+    if (cause instanceof RuntimeException && cause.getCause() instanceof 
Exception) {
+      return toGravitinoException((Exception) cause.getCause(), target);
+    }
+
+    String message = cause.getMessage();
+    String lowerMessage = message != null ? message.toLowerCase(Locale.ROOT) : 
"";
+    String exceptionClassName = cause.getClass().getName();
+
+    if (exceptionClassName.contains("AlreadyExistsException")) {
+      return toAlreadyExistsException(cause, target, message);
+    }
+
+    if (NO_SUCH_EXCEPTION_SET.contains(exceptionClassName)) {
+      return toNoSuchObjectException(cause, target, message);
+    }
+
+    if (exceptionClassName.contains("InvalidOperationException")) {
+      if (isNonEmptySchemaMessage(lowerMessage)) {
+        return new NonEmptySchemaException(
+            cause, "Hive schema %s is not empty in Hive Metastore", 
target.name());
+      }
+      return new IllegalArgumentException(cause.getMessage(), cause);
+    }
+
+    if (exceptionClassName.contains("MetaException")) {
+      if (lowerMessage.contains("invalid partition key")) {
+        return new NoSuchPartitionException(
+            cause, "Hive partition %s does not exist in Hive Metastore", 
target.name());
+      }
+    }
+
+    if (exceptionClassName.contains("TException")) {
+      if (lowerMessage.contains("already exists")) {
+        return toAlreadyExistsException(cause, target, message);
+      }
+      if (isNotFoundKeyword(lowerMessage)) {
+        return toNoSuchObjectException(cause, target, message);
+      }
+      if (isNonEmptySchemaMessage(lowerMessage)) {
+        return new NonEmptySchemaException(
+            cause, "Hive schema %s is not empty in Hive Metastore", 
target.name());
+      }
+    }
+
+    if (isConnectionKeyword(lowerMessage) || 
exceptionClassName.contains("TransportException")) {
+      return new ConnectionFailedException(
+          cause, "Failed to connect to Hive Metastore: %s", target.name());
+    }
+
+    if (cause instanceof RuntimeException) {
+      return (RuntimeException) cause;
+    }
+    return new GravitinoRuntimeException(cause, message);
+  }
+
+  private static boolean isNotFoundKeyword(String lowerMessage) {
+    return Stream.of("does not exist", "not found", "no such", "there is no")
+        .anyMatch(lowerMessage::contains);
+  }
+
+  private static boolean isConnectionKeyword(String lowerMessage) {
+    return Stream.of("connection", "connect", "timeout", "network")
+        .anyMatch(lowerMessage::contains);
+  }
+
+  private static boolean isNonEmptySchemaMessage(String lowerMessage) {
+    return (lowerMessage.contains("non-empty") || lowerMessage.contains("not 
empty"))
+        && (lowerMessage.contains("schema") || 
lowerMessage.contains("database"));
+  }
+
+  private static RuntimeException toAlreadyExistsException(
+      Throwable cause, ExceptionTarget target, String rawMessage) {
+    TargetType objectType = target.type();
+    return switch (objectType) {
+      case PARTITION -> new PartitionAlreadyExistsException(
+          cause, "Hive partition %s already exists in Hive Metastore", 
target.name());
+      case TABLE -> new TableAlreadyExistsException(
+          cause, "Hive table %s already exists in Hive Metastore", 
target.name());
+      case SCHEMA -> new SchemaAlreadyExistsException(
+          cause, "Hive schema %s already exists in Hive Metastore", 
target.name());
+      default -> new AlreadyExistsException(cause, "%s", rawMessage);
+    };
+  }
+
+  private static RuntimeException toNoSuchObjectException(
+      Throwable cause, ExceptionTarget target, String rawMessage) {
+    return switch (target.type()) {
+      case PARTITION -> new NoSuchPartitionException(
+          cause, "Hive partition %s does not exist in Hive Metastore", 
target.name());
+      case TABLE -> new NoSuchTableException(
+          cause, "Hive table %s does not exist in Hive Metastore", 
target.name());
+      case SCHEMA -> new NoSuchSchemaException(
+          cause, "Hive schema %s does not exist in Hive Metastore", 
target.name());
+      default -> new NoSuchEntityException(cause, "%s", rawMessage);
+    };
   }
 }
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShim.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShim.java
index d780354ef9..dc477129ee 100644
--- 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShim.java
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShim.java
@@ -37,8 +37,6 @@ public abstract class HiveShim {
 
   protected static final String RETRYING_META_STORE_CLIENT_CLASS =
       "org.apache.hadoop.hive.metastore.RetryingMetaStoreClient";
-  protected static final String IMETA_STORE_CLIENT_CLASS =
-      "org.apache.hadoop.hive.metastore.IMetaStoreClient";
   protected static final String HIVE_CONF_CLASS = 
"org.apache.hadoop.hive.conf.HiveConf";
   protected static final String CONFIGURATION_CLASS = 
"org.apache.hadoop.conf.Configuration";
   protected static final String METHOD_GET_PROXY = "getProxy";
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShimV2.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShimV2.java
new file mode 100644
index 0000000000..383f5a1d85
--- /dev/null
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShimV2.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gravitino.hive.client;
+
+import static 
org.apache.gravitino.hive.client.HiveClientClassLoader.HiveVersion.HIVE2;
+import static 
org.apache.gravitino.hive.client.Util.updateConfigurationFromProperties;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Properties;
+import org.apache.gravitino.hive.HivePartition;
+import org.apache.gravitino.hive.HiveSchema;
+import org.apache.gravitino.hive.HiveTable;
+import org.apache.gravitino.hive.client.HiveExceptionConverter.ExceptionTarget;
+import org.apache.gravitino.hive.converter.HiveDatabaseConverter;
+import org.apache.gravitino.hive.converter.HiveTableConverter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+
+class HiveShimV2 extends HiveShim {
+
+  HiveShimV2(Properties properties) {
+    super(HIVE2, properties);
+  }
+
+  HiveShimV2(HiveClientClassLoader.HiveVersion version, Properties properties) 
{
+    super(version, properties);
+  }
+
+  @Override
+  public IMetaStoreClient createMetaStoreClient(Properties properties) {
+    try {
+      ClassLoader classLoader = this.getClass().getClassLoader();
+      Class<?> clientClass = 
classLoader.loadClass(RETRYING_META_STORE_CLIENT_CLASS);
+      Class<?> hiveConfClass = classLoader.loadClass(HIVE_CONF_CLASS);
+      Class<?> confClass = classLoader.loadClass(CONFIGURATION_CLASS);
+
+      Object conf = confClass.getDeclaredConstructor().newInstance();
+      updateConfigurationFromProperties(properties, (Configuration) conf);
+
+      Constructor<?> hiveConfCtor = hiveConfClass.getConstructor(confClass, 
Class.class);
+      Object hiveConfInstance = hiveConfCtor.newInstance(conf, hiveConfClass);
+
+      Method getProxyMethod = clientClass.getMethod(METHOD_GET_PROXY, 
hiveConfClass, boolean.class);
+      return (IMetaStoreClient) getProxyMethod.invoke(null, hiveConfInstance, 
false);
+
+    } catch (Exception e) {
+      throw HiveExceptionConverter.toGravitinoException(
+          e, ExceptionTarget.other("MetaStoreClient"));
+    }
+  }
+
+  @Override
+  public void createDatabase(HiveSchema database) {
+    try {
+      Database db = HiveDatabaseConverter.toHiveDb(database);
+      client.createDatabase(db);
+    } catch (Exception e) {
+      throw HiveExceptionConverter.toGravitinoException(e, 
ExceptionTarget.schema(database.name()));
+    }
+  }
+
+  @Override
+  public HiveSchema getDatabase(String catalogName, String databaseName) {
+    try {
+      Database db = client.getDatabase(databaseName);
+      return HiveDatabaseConverter.fromHiveDB(db);
+    } catch (Exception e) {
+      throw HiveExceptionConverter.toGravitinoException(e, 
ExceptionTarget.schema(databaseName));
+    }
+  }
+
+  @Override
+  public void alterDatabase(String catalogName, String databaseName, 
HiveSchema database) {
+    try {
+      Database db = HiveDatabaseConverter.toHiveDb(database);
+      client.alterDatabase(databaseName, db);
+    } catch (Exception e) {
+      throw HiveExceptionConverter.toGravitinoException(e, 
ExceptionTarget.schema(databaseName));
+    }
+  }
+
+  @Override
+  public void dropDatabase(String catalogName, String databaseName, boolean 
cascade) {
+    try {
+      client.dropDatabase(databaseName, false, false, cascade);
+    } catch (Exception e) {
+      throw HiveExceptionConverter.toGravitinoException(e, 
ExceptionTarget.schema(databaseName));
+    }
+  }
+
+  @Override
+  public List<String> getAllTables(String catalogName, String databaseName) {
+    try {
+      return client.getAllTables(databaseName);
+    } catch (Exception e) {
+      throw HiveExceptionConverter.toGravitinoException(e, 
ExceptionTarget.schema(databaseName));
+    }
+  }
+
+  @Override
+  public List<String> listTableNamesByFilter(
+      String catalogName, String databaseName, String filter, short pageSize) {
+    try {
+      return client.listTableNamesByFilter(databaseName, filter, pageSize);
+    } catch (Exception e) {
+      throw HiveExceptionConverter.toGravitinoException(e, 
ExceptionTarget.schema(databaseName));
+    }
+  }
+
+  @Override
+  public HiveTable getTable(String catalogName, String databaseName, String 
tableName) {
+    try {
+      var tb = client.getTable(databaseName, tableName);
+      return HiveTableConverter.fromHiveTable(tb);
+    } catch (Exception e) {
+      throw HiveExceptionConverter.toGravitinoException(e, 
ExceptionTarget.table(tableName));
+    }
+  }
+
+  @Override
+  public void alterTable(
+      String catalogName, String databaseName, String tableName, HiveTable 
alteredHiveTable) {
+    try {
+      var tb = HiveTableConverter.toHiveTable(alteredHiveTable);
+      client.alter_table(databaseName, tableName, tb);
+    } catch (Exception e) {
+      throw HiveExceptionConverter.toGravitinoException(e, 
ExceptionTarget.table(tableName));
+    }
+  }
+
+  @Override
+  public void dropTable(
+      String catalogName,
+      String databaseName,
+      String tableName,
+      boolean deleteData,
+      boolean ifPurge) {
+    try {
+      client.dropTable(databaseName, tableName, deleteData, ifPurge);
+    } catch (Exception e) {
+      throw HiveExceptionConverter.toGravitinoException(e, 
ExceptionTarget.table(tableName));
+    }
+  }
+
+  @Override
+  public void createTable(HiveTable hiveTable) {
+    try {
+      var tb = HiveTableConverter.toHiveTable(hiveTable);
+      client.createTable(tb);
+    } catch (Exception e) {
+      throw HiveExceptionConverter.toGravitinoException(e, 
ExceptionTarget.table(hiveTable.name()));
+    }
+  }
+
+  @Override
+  public List<String> listPartitionNames(HiveTable table, short pageSize) {
+    try {
+      String databaseName = table.databaseName();
+      return client.listPartitionNames(databaseName, table.name(), pageSize);
+    } catch (Exception e) {
+      throw HiveExceptionConverter.toGravitinoException(e, 
ExceptionTarget.table(table.name()));
+    }
+  }
+
+  @Override
+  public List<HivePartition> listPartitions(HiveTable table, short pageSize) {
+    try {
+      String databaseName = table.databaseName();
+      var partitions = client.listPartitions(databaseName, table.name(), 
pageSize);
+      return partitions.stream().map(p -> 
HiveTableConverter.fromHivePartition(table, p)).toList();
+    } catch (Exception e) {
+      throw HiveExceptionConverter.toGravitinoException(e, 
ExceptionTarget.table(table.name()));
+    }
+  }
+
+  @Override
+  public List<HivePartition> listPartitions(
+      HiveTable table, List<String> filterPartitionValueList, short pageSize) {
+    try {
+      String databaseName = table.databaseName();
+      var partitions =
+          client.listPartitions(databaseName, table.name(), 
filterPartitionValueList, pageSize);
+      return partitions.stream().map(p -> 
HiveTableConverter.fromHivePartition(table, p)).toList();
+    } catch (Exception e) {
+      throw HiveExceptionConverter.toGravitinoException(e, 
ExceptionTarget.table(table.name()));
+    }
+  }
+
+  @Override
+  public HivePartition getPartition(HiveTable table, String partitionName) {
+    try {
+      String databaseName = table.databaseName();
+      var partitionValues = 
HivePartition.extractPartitionValues(partitionName);
+      var partition = client.getPartition(databaseName, table.name(), 
partitionValues);
+      return HiveTableConverter.fromHivePartition(table, partition);
+    } catch (Exception e) {
+      throw HiveExceptionConverter.toGravitinoException(
+          e, ExceptionTarget.partition(partitionName));
+    }
+  }
+
+  @Override
+  public HivePartition addPartition(HiveTable table, HivePartition partition) {
+    try {
+      String databaseName = table.databaseName();
+      var hivePartition = HiveTableConverter.toHivePartition(databaseName, 
table, partition);
+      var addedPartition = client.add_partition(hivePartition);
+      return HiveTableConverter.fromHivePartition(table, addedPartition);
+    } catch (Exception e) {
+      throw HiveExceptionConverter.toGravitinoException(
+          e, ExceptionTarget.partition(partition.name()));
+    }
+  }
+
+  @Override
+  public void dropPartition(
+      String catalogName,
+      String databaseName,
+      String tableName,
+      String partitionName,
+      boolean deleteData) {
+    try {
+      var partitionValues = 
HivePartition.extractPartitionValues(partitionName);
+      client.dropPartition(databaseName, tableName, partitionValues, 
deleteData);
+    } catch (Exception e) {
+      throw HiveExceptionConverter.toGravitinoException(
+          e, ExceptionTarget.partition(partitionName));
+    }
+  }
+
+  @Override
+  public String getDelegationToken(String finalPrincipalName, String userName) 
{
+    try {
+      return client.getDelegationToken(finalPrincipalName, userName);
+    } catch (Exception e) {
+      throw HiveExceptionConverter.toGravitinoException(
+          e, ExceptionTarget.other(finalPrincipalName));
+    }
+  }
+
+  @Override
+  public List<HiveTable> getTableObjectsByName(
+      String catalogName, String databaseName, List<String> allTables) {
+    try {
+      // Hive2 doesn't support catalog, so we ignore catalogName and use 
databaseName
+      var tables = client.getTableObjectsByName(databaseName, allTables);
+      return tables.stream().map(HiveTableConverter::fromHiveTable).toList();
+    } catch (Exception e) {
+      throw HiveExceptionConverter.toGravitinoException(e, 
ExceptionTarget.schema(databaseName));
+    }
+  }
+
+  @Override
+  public List<String> getCatalogs() {
+    return List.of();
+  }
+
+  @Override
+  public void close() throws Exception {
+    client.close();
+  }
+}
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShimV3.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShimV3.java
new file mode 100644
index 0000000000..6409696a41
--- /dev/null
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShimV3.java
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gravitino.hive.client;
+
+import static 
org.apache.gravitino.hive.client.HiveClientClassLoader.HiveVersion.HIVE3;
+import static 
org.apache.gravitino.hive.client.Util.updateConfigurationFromProperties;
+
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Properties;
+import org.apache.commons.lang3.reflect.MethodUtils;
+import org.apache.gravitino.hive.HivePartition;
+import org.apache.gravitino.hive.HiveSchema;
+import org.apache.gravitino.hive.HiveTable;
+import org.apache.gravitino.hive.client.HiveExceptionConverter.ExceptionTarget;
+import org.apache.gravitino.hive.converter.HiveDatabaseConverter;
+import org.apache.gravitino.hive.converter.HiveTableConverter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+class HiveShimV3 extends HiveShimV2 {
+
+  private final Method createDatabaseMethod;
+  private final Method getDatabaseMethod;
+  private final Method getAllDatabasesMethod;
+  private final Method alterDatabaseMethod;
+  private final Method dropDatabaseMethod;
+  private final Method getTableMethod;
+  private final Method createTableMethod;
+  private final Method alterTableMethod;
+  private final Method dropTableMethod;
+  private final Method getAllTablesMethod;
+  private final Method listTableNamesByFilterMethod;
+  private final Method listPartitionNamesMethod;
+  private final Method listPartitionsMethod;
+  private final Method listPartitionsWithFilterMethod;
+  private final Method getPartitionMethod;
+  private final Method addPartitionMethod;
+  private final Method dropPartitionMethod;
+  private final Method getTableObjectsByNameMethod;
+  private final Method databaseSetCatalogNameMethod;
+  private final Method getCatalogsMethod;
+  private final Method tableSetCatalogNameMethod;
+  private final Method partitionSetCatalogNameMethod;
+
+  HiveShimV3(Properties properties) {
+    super(HIVE3, properties);
+    try {
+      // Hive3 database methods with catalog support
+      this.createDatabaseMethod =
+          IMetaStoreClient.class.getMethod("createDatabase", Database.class);
+      this.getDatabaseMethod =
+          IMetaStoreClient.class.getMethod("getDatabase", String.class, 
String.class);
+      this.getAllDatabasesMethod =
+          IMetaStoreClient.class.getMethod("getAllDatabases", String.class);
+      this.alterDatabaseMethod =
+          IMetaStoreClient.class.getMethod(
+              "alterDatabase", String.class, String.class, Database.class);
+      this.dropDatabaseMethod =
+          IMetaStoreClient.class.getMethod(
+              "dropDatabase",
+              String.class,
+              String.class,
+              boolean.class,
+              boolean.class,
+              boolean.class);
+
+      // Hive3 table methods with catalog support
+      this.getTableMethod =
+          IMetaStoreClient.class.getMethod("getTable", String.class, 
String.class, String.class);
+      this.createTableMethod =
+          IMetaStoreClient.class.getMethod(
+              "createTable", org.apache.hadoop.hive.metastore.api.Table.class);
+      this.alterTableMethod =
+          IMetaStoreClient.class.getMethod(
+              "alter_table",
+              String.class,
+              String.class,
+              String.class,
+              org.apache.hadoop.hive.metastore.api.Table.class);
+      this.dropTableMethod =
+          IMetaStoreClient.class.getMethod(
+              "dropTable", String.class, String.class, String.class, 
boolean.class, boolean.class);
+      this.getAllTablesMethod =
+          IMetaStoreClient.class.getMethod("getAllTables", String.class, 
String.class);
+      this.listTableNamesByFilterMethod =
+          IMetaStoreClient.class.getMethod(
+              "listTableNamesByFilter", String.class, String.class, 
String.class, int.class);
+
+      // Hive3 partition methods with catalog support (using int for pageSize)
+      this.listPartitionNamesMethod =
+          IMetaStoreClient.class.getMethod(
+              "listPartitionNames", String.class, String.class, String.class, 
int.class);
+      this.listPartitionsMethod =
+          IMetaStoreClient.class.getMethod(
+              "listPartitions", String.class, String.class, String.class, 
int.class);
+      this.listPartitionsWithFilterMethod =
+          IMetaStoreClient.class.getMethod(
+              "listPartitions", String.class, String.class, String.class, 
List.class, int.class);
+      this.getPartitionMethod =
+          IMetaStoreClient.class.getMethod(
+              "getPartition", String.class, String.class, String.class, 
List.class);
+      this.addPartitionMethod =
+          IMetaStoreClient.class.getMethod(
+              "add_partition", 
org.apache.hadoop.hive.metastore.api.Partition.class);
+      this.dropPartitionMethod =
+          IMetaStoreClient.class.getMethod(
+              "dropPartition", String.class, String.class, String.class, 
List.class, boolean.class);
+      // Hive3 getTableObjectsByName with catalog parameter
+      this.getTableObjectsByNameMethod =
+          IMetaStoreClient.class.getMethod(
+              "getTableObjectsByName", String.class, String.class, List.class);
+      this.getCatalogsMethod = IMetaStoreClient.class.getMethod("getCatalogs");
+
+      // SetCatalogName methods for Hive3
+      this.databaseSetCatalogNameMethod =
+          MethodUtils.getAccessibleMethod(Database.class, "setCatalogName", 
String.class);
+      this.tableSetCatalogNameMethod =
+          MethodUtils.getAccessibleMethod(
+              org.apache.hadoop.hive.metastore.api.Table.class, "setCatName", 
String.class);
+      this.partitionSetCatalogNameMethod =
+          MethodUtils.getAccessibleMethod(
+              org.apache.hadoop.hive.metastore.api.Partition.class, 
"setCatName", String.class);
+
+    } catch (Exception e) {
+      throw HiveExceptionConverter.toGravitinoException(e, 
ExceptionTarget.other("HiveShimV3"));
+    }
+  }
+
+  @Override
+  public IMetaStoreClient createMetaStoreClient(Properties properties) {
+    try {
+      ClassLoader classLoader = this.getClass().getClassLoader();
+      Class<?> clientClass = 
classLoader.loadClass(RETRYING_META_STORE_CLIENT_CLASS);
+      Class<?> confClass = classLoader.loadClass(CONFIGURATION_CLASS);
+
+      Object conf = confClass.getDeclaredConstructor().newInstance();
+      updateConfigurationFromProperties(properties, (Configuration) conf);
+
+      Method getProxyMethod = clientClass.getMethod(METHOD_GET_PROXY, 
confClass, boolean.class);
+      return (IMetaStoreClient) getProxyMethod.invoke(null, conf, false);
+
+    } catch (Exception e) {
+      throw HiveExceptionConverter.toGravitinoException(
+          e, ExceptionTarget.other("MetaStoreClient"));
+    }
+  }
+
+  @Override
+  public void createDatabase(HiveSchema database) {
+    Database db = HiveDatabaseConverter.toHiveDb(database);
+    String catalogName = database.catalogName();
+    invoke(ExceptionTarget.other(""), db, databaseSetCatalogNameMethod, 
catalogName);
+    invoke(ExceptionTarget.schema(database.name()), client, 
createDatabaseMethod, db);
+  }
+
+  @Override
+  public List<String> getAllDatabases(String catalogName) {
+    return (List<String>)
+        invoke(ExceptionTarget.catalog(catalogName), client, 
getAllDatabasesMethod, catalogName);
+  }
+
+  @Override
+  public HiveSchema getDatabase(String catalogName, String databaseName) {
+    Database db =
+        (Database)
+            invoke(
+                ExceptionTarget.schema(databaseName),
+                client,
+                getDatabaseMethod,
+                catalogName,
+                databaseName);
+    return HiveDatabaseConverter.fromHiveDB(db);
+  }
+
+  @Override
+  public void alterDatabase(String catalogName, String databaseName, 
HiveSchema database) {
+    Database db = HiveDatabaseConverter.toHiveDb(database);
+    invoke(ExceptionTarget.other(""), db, databaseSetCatalogNameMethod, 
catalogName);
+    invoke(
+        ExceptionTarget.schema(databaseName),
+        client,
+        alterDatabaseMethod,
+        catalogName,
+        databaseName,
+        db);
+  }
+
+  @Override
+  public void dropDatabase(String catalogName, String databaseName, boolean 
cascade) {
+    invoke(
+        ExceptionTarget.schema(databaseName),
+        client,
+        dropDatabaseMethod,
+        catalogName,
+        databaseName,
+        true,
+        false,
+        cascade);
+  }
+
+  @Override
+  public List<String> getAllTables(String catalogName, String databaseName) {
+    return (List<String>)
+        invoke(
+            ExceptionTarget.schema(databaseName),
+            client,
+            getAllTablesMethod,
+            catalogName,
+            databaseName);
+  }
+
+  @Override
+  public List<String> listTableNamesByFilter(
+      String catalogName, String databaseName, String filter, short pageSize) {
+    Object pageSizeArg = convertPageSize(listTableNamesByFilterMethod, 3, 
pageSize);
+    return (List<String>)
+        invoke(
+            ExceptionTarget.schema(databaseName),
+            client,
+            listTableNamesByFilterMethod,
+            catalogName,
+            databaseName,
+            filter,
+            pageSizeArg);
+  }
+
+  @Override
+  public HiveTable getTable(String catalogName, String databaseName, String 
tableName) {
+    var tb =
+        (org.apache.hadoop.hive.metastore.api.Table)
+            invoke(
+                ExceptionTarget.table(tableName),
+                client,
+                getTableMethod,
+                catalogName,
+                databaseName,
+                tableName);
+    return HiveTableConverter.fromHiveTable(tb);
+  }
+
+  @Override
+  public void alterTable(
+      String catalogName, String databaseName, String tableName, HiveTable 
alteredHiveTable) {
+    var tb = HiveTableConverter.toHiveTable(alteredHiveTable);
+    invoke(ExceptionTarget.other(""), tb, tableSetCatalogNameMethod, 
catalogName);
+    invoke(
+        ExceptionTarget.table(tableName),
+        client,
+        alterTableMethod,
+        catalogName,
+        databaseName,
+        tableName,
+        tb);
+  }
+
+  @Override
+  public void dropTable(
+      String catalogName,
+      String databaseName,
+      String tableName,
+      boolean deleteData,
+      boolean ifPurge) {
+    invoke(
+        ExceptionTarget.table(tableName),
+        client,
+        dropTableMethod,
+        catalogName,
+        databaseName,
+        tableName,
+        deleteData,
+        ifPurge);
+  }
+
+  @Override
+  public void createTable(HiveTable hiveTable) {
+    String catalogName = hiveTable.catalogName();
+    var tb = HiveTableConverter.toHiveTable(hiveTable);
+    invoke(ExceptionTarget.other(""), tb, tableSetCatalogNameMethod, 
catalogName);
+    invoke(ExceptionTarget.schema(hiveTable.name()), client, 
createTableMethod, tb);
+  }
+
+  @Override
+  public List<String> listPartitionNames(HiveTable table, short pageSize) {
+    String catalogName = table.catalogName();
+    String databaseName = table.databaseName();
+    Object pageSizeArg = convertPageSize(listPartitionNamesMethod, 3, 
pageSize);
+    return (List<String>)
+        invoke(
+            ExceptionTarget.table(table.name()),
+            client,
+            listPartitionNamesMethod,
+            catalogName,
+            databaseName,
+            table.name(),
+            pageSizeArg);
+  }
+
+  @Override
+  public List<HivePartition> listPartitions(HiveTable table, short pageSize) {
+    String catalogName = table.catalogName();
+    String databaseName = table.databaseName();
+    Object pageSizeArg = convertPageSize(listPartitionsMethod, 3, pageSize);
+    var partitions =
+        (List<org.apache.hadoop.hive.metastore.api.Partition>)
+            invoke(
+                ExceptionTarget.table(table.name()),
+                client,
+                listPartitionsMethod,
+                catalogName,
+                databaseName,
+                table.name(),
+                pageSizeArg);
+    return partitions.stream().map(p -> 
HiveTableConverter.fromHivePartition(table, p)).toList();
+  }
+
+  @Override
+  public List<HivePartition> listPartitions(
+      HiveTable table, List<String> filterPartitionValueList, short pageSize) {
+    String catalogName = table.catalogName();
+    String databaseName = table.databaseName();
+    Object pageSizeArg = convertPageSize(listPartitionsWithFilterMethod, 4, 
pageSize);
+    var partitions =
+        (List<org.apache.hadoop.hive.metastore.api.Partition>)
+            invoke(
+                ExceptionTarget.table(table.name()),
+                client,
+                listPartitionsWithFilterMethod,
+                catalogName,
+                databaseName,
+                table.name(),
+                filterPartitionValueList,
+                pageSizeArg);
+    return partitions.stream().map(p -> 
HiveTableConverter.fromHivePartition(table, p)).toList();
+  }
+
+  @Override
+  public HivePartition getPartition(HiveTable table, String partitionName) {
+    String catalogName = table.catalogName();
+    String databaseName = table.databaseName();
+    var partitionValues = HivePartition.extractPartitionValues(partitionName);
+    var partition =
+        (org.apache.hadoop.hive.metastore.api.Partition)
+            invoke(
+                ExceptionTarget.partition(partitionName),
+                client,
+                getPartitionMethod,
+                catalogName,
+                databaseName,
+                table.name(),
+                partitionValues);
+    return HiveTableConverter.fromHivePartition(table, partition);
+  }
+
+  @Override
+  public HivePartition addPartition(HiveTable table, HivePartition partition) {
+    String catalogName = table.catalogName();
+    String databaseName = table.databaseName();
+    var hivePartition = HiveTableConverter.toHivePartition(databaseName, 
table, partition);
+    invoke(ExceptionTarget.other(""), hivePartition, 
partitionSetCatalogNameMethod, catalogName);
+    var addedPartition =
+        (org.apache.hadoop.hive.metastore.api.Partition)
+            invoke(
+                ExceptionTarget.partition(partition.name()),
+                client,
+                addPartitionMethod,
+                hivePartition);
+    return HiveTableConverter.fromHivePartition(table, addedPartition);
+  }
+
+  @Override
+  public void dropPartition(
+      String catalogName,
+      String databaseName,
+      String tableName,
+      String partitionName,
+      boolean deleteData) {
+    var partitionValues = HivePartition.extractPartitionValues(partitionName);
+    invoke(
+        ExceptionTarget.partition(partitionName),
+        client,
+        dropPartitionMethod,
+        catalogName,
+        databaseName,
+        tableName,
+        partitionValues,
+        deleteData);
+  }
+
+  @Override
+  public List<HiveTable> getTableObjectsByName(
+      String catalogName, String databaseName, List<String> allTables) {
+    var tables =
+        (List<Table>)
+            invoke(
+                ExceptionTarget.schema(databaseName),
+                client,
+                getTableObjectsByNameMethod,
+                catalogName,
+                databaseName,
+                allTables);
+    return tables.stream().map(HiveTableConverter::fromHiveTable).toList();
+  }
+
+  @Override
+  public List<String> getCatalogs() {
+    return (List<String>) invoke(ExceptionTarget.other(""), client, 
getCatalogsMethod);
+  }
+
+  /**
+   * Invokes a method on an object and converts any exception to a Gravitino 
exception.
+   *
+   * @param target Hive object info used in error messages and exception 
mapping
+   * @param object The object to invoke the method on
+   * @param method The method to invoke
+   * @param args The arguments to pass to the method
+   * @return The result of the method invocation
+   */
+  private Object invoke(ExceptionTarget target, Object object, Method method, 
Object... args) {
+    try {
+      return method.invoke(object, args);
+    } catch (Exception e) {
+      throw HiveExceptionConverter.toGravitinoException(e, target);
+    }
+  }
+
+  /**
+   * Converts pageSize from short to int if the method parameter expects int 
type.
+   *
+   * @param method The method to check parameter types
+   * @param paramIndex The index of the pageSize parameter
+   * @param pageSize The pageSize value as short
+   * @return The pageSize as Object (short or int)
+   */
+  private Object convertPageSize(Method method, int paramIndex, short 
pageSize) {
+    if (method.getParameterTypes()[paramIndex] == int.class) {
+      return (int) pageSize;
+    }
+    return pageSize;
+  }
+
+  @Override
+  public void close() throws Exception {
+    client.close();
+  }
+}
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/Util.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/Util.java
index fd16977363..a75b117142 100644
--- 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/Util.java
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/Util.java
@@ -27,9 +27,9 @@ public class Util {
 
   public static final String HIVE_CONFIG_RESOURCES = "hive.config.resources";
 
-  public static Configuration buildConfigurationFromProperties(Properties 
properties) {
+  public static void updateConfigurationFromProperties(
+      Properties properties, Configuration config) {
     try {
-      Configuration config = new Configuration();
       String configResources = properties.getProperty(HIVE_CONFIG_RESOURCES);
       if (StringUtils.isNotBlank(configResources)) {
         for (String resource : configResources.split(",")) {
@@ -41,7 +41,6 @@ public class Util {
       }
 
       properties.forEach((k, v) -> config.set(k.toString(), v.toString()));
-      return config;
     } catch (Exception e) {
       throw new RuntimeException("Failed to create configuration", e);
     }
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/converter/HiveDatabaseConverter.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/converter/HiveDatabaseConverter.java
new file mode 100644
index 0000000000..d3ba428161
--- /dev/null
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/converter/HiveDatabaseConverter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hive.converter;
+
+import static org.apache.gravitino.catalog.hive.HiveConstants.LOCATION;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.gravitino.hive.HiveSchema;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
+
+public class HiveDatabaseConverter {
+  public static HiveSchema fromHiveDB(Database db) {
+    Preconditions.checkArgument(db != null, "Database cannot be null");
+
+    Map<String, String> properties = buildSchemaProperties(db);
+
+    // Get audit info from Hive's Database object. Because Hive's database 
doesn't store create
+    // time, last modifier and last modified time, we only get creator from 
Hive's database.
+    AuditInfo.Builder auditInfoBuilder = AuditInfo.builder();
+    
Optional.ofNullable(db.getOwnerName()).ifPresent(auditInfoBuilder::withCreator);
+
+    String catalogName = null;
+    try {
+      java.lang.reflect.Method getCatalogNameMethod = 
db.getClass().getMethod("getCatalogName");
+      catalogName = (String) getCatalogNameMethod.invoke(db);
+    } catch (Exception e) {
+      // Hive2 doesn't have getCatalogName method, catalogName will be null
+    }
+
+    HiveSchema hiveSchema =
+        HiveSchema.builder()
+            .withName(db.getName())
+            .withComment(db.getDescription())
+            .withProperties(properties)
+            .withAuditInfo(auditInfoBuilder.build())
+            .withCatalogName(catalogName)
+            .build();
+    return hiveSchema;
+  }
+  /**
+   * Add a comment on lines L57 to L65Add diff commentMarkdown input: edit mode
+   * selected.WritePreviewHeadingBoldItalicQuoteCodeLinkUnordered listNumbered 
listTask
+   * listMentionReferenceSaved repliesAdd FilesPaste, drop, or click to add 
filesCancelCommentStart
+   * a reviewReturn to code Build schema properties from a Hive Database 
object.
+   *
+   * @param database The Hive Database object.
+   * @return A map of schema properties.
+   */
+  public static Map<String, String> buildSchemaProperties(Database database) {
+    Map<String, String> properties = new HashMap<>(database.getParameters());
+    properties.put(LOCATION, database.getLocationUri());
+    return properties;
+  }
+
+  public static Database toHiveDb(HiveSchema hiveSchema) {
+    Preconditions.checkArgument(hiveSchema != null, "HiveSchema cannot be 
null");
+    Database hiveDb = new Database();
+
+    hiveDb.setName(hiveSchema.name());
+    
Optional.ofNullable(hiveSchema.properties().get(LOCATION)).ifPresent(hiveDb::setLocationUri);
+    
Optional.ofNullable(hiveSchema.comment()).ifPresent(hiveDb::setDescription);
+
+    // TODO: Add more privilege info to Hive's Database object after Gravitino 
supports privilege.
+    hiveDb.setOwnerName(hiveSchema.auditInfo().creator());
+    hiveDb.setOwnerType(PrincipalType.USER);
+
+    Map<String, String> parameters = new HashMap<>(hiveSchema.properties());
+    parameters.remove(LOCATION);
+    hiveDb.setParameters(parameters);
+
+    return hiveDb;
+  }
+}
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/converter/HiveTableConverter.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/converter/HiveTableConverter.java
index f58080c85b..5c7079bf33 100644
--- 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/converter/HiveTableConverter.java
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/converter/HiveTableConverter.java
@@ -18,14 +18,32 @@
  */
 package org.apache.gravitino.hive.converter;
 
+import static org.apache.gravitino.catalog.hive.HiveConstants.COMMENT;
+import static org.apache.gravitino.catalog.hive.HiveConstants.EXTERNAL;
+import static org.apache.gravitino.catalog.hive.HiveConstants.FORMAT;
+import static org.apache.gravitino.catalog.hive.HiveConstants.INPUT_FORMAT;
+import static org.apache.gravitino.catalog.hive.HiveConstants.LOCATION;
+import static org.apache.gravitino.catalog.hive.HiveConstants.OUTPUT_FORMAT;
+import static org.apache.gravitino.catalog.hive.HiveConstants.SERDE_LIB;
+import static org.apache.gravitino.catalog.hive.HiveConstants.SERDE_NAME;
+import static 
org.apache.gravitino.catalog.hive.HiveConstants.SERDE_PARAMETER_PREFIX;
+import static org.apache.gravitino.catalog.hive.HiveConstants.TABLE_TYPE;
 import static 
org.apache.gravitino.rel.expressions.transforms.Transforms.identity;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
-import org.apache.gravitino.connector.BaseColumn;
+import org.apache.gravitino.catalog.hive.StorageFormat;
+import org.apache.gravitino.hive.HiveColumn;
+import org.apache.gravitino.hive.HivePartition;
+import org.apache.gravitino.hive.HiveTable;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.expressions.Expression;
@@ -36,12 +54,221 @@ import 
org.apache.gravitino.rel.expressions.sorts.SortDirection;
 import org.apache.gravitino.rel.expressions.sorts.SortOrder;
 import org.apache.gravitino.rel.expressions.sorts.SortOrders;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
 
 public class HiveTableConverter {
-  public static AuditInfo getAuditInfo(Table table) {
+
+  public static HiveTable 
fromHiveTable(org.apache.hadoop.hive.metastore.api.Table table) {
+    Preconditions.checkArgument(table != null, "Table cannot be null");
+    AuditInfo auditInfo = HiveTableConverter.getAuditInfo(table);
+
+    Distribution distribution = HiveTableConverter.getDistribution(table);
+
+    SortOrder[] sortOrders = HiveTableConverter.getSortOrders(table);
+
+    Column[] columns = HiveTableConverter.getColumns(table);
+
+    Transform[] partitioning = HiveTableConverter.getPartitioning(table);
+
+    String catalogName = null;
+    try {
+      java.lang.reflect.Method getCatNameMethod = 
table.getClass().getMethod("getCatName");
+      catalogName = (String) getCatNameMethod.invoke(table);
+    } catch (Exception e) {
+      // Hive2 doesn't have getCatName method, catalogName will be null
+    }
+
+    HiveTable hiveTable =
+        HiveTable.builder()
+            .withName(table.getTableName())
+            .withComment(table.getParameters().get(COMMENT))
+            .withProperties(buildTableProperties(table))
+            .withColumns(columns)
+            .withDistribution(distribution)
+            .withSortOrders(sortOrders)
+            .withAuditInfo(auditInfo)
+            .withPartitioning(partitioning)
+            .withCatalogName(catalogName)
+            .withDatabaseName(table.getDbName())
+            .build();
+    return hiveTable;
+  }
+
+  private static Map<String, String> buildTableProperties(
+      org.apache.hadoop.hive.metastore.api.Table table) {
+    Map<String, String> properties = Maps.newHashMap(table.getParameters());
+
+    Optional.ofNullable(table.getTableType()).ifPresent(t -> 
properties.put(TABLE_TYPE, t));
+
+    StorageDescriptor sd = table.getSd();
+    properties.put(LOCATION, sd.getLocation());
+    properties.put(INPUT_FORMAT, sd.getInputFormat());
+    properties.put(OUTPUT_FORMAT, sd.getOutputFormat());
+
+    SerDeInfo serdeInfo = sd.getSerdeInfo();
+    Optional.ofNullable(serdeInfo.getName()).ifPresent(name -> 
properties.put(SERDE_NAME, name));
+    Optional.ofNullable(serdeInfo.getSerializationLib())
+        .ifPresent(lib -> properties.put(SERDE_LIB, lib));
+    Optional.ofNullable(serdeInfo.getParameters())
+        .ifPresent(p -> p.forEach((k, v) -> 
properties.put(SERDE_PARAMETER_PREFIX + k, v)));
+
+    return properties;
+  }
+
+  public static org.apache.hadoop.hive.metastore.api.Table 
toHiveTable(HiveTable hiveTable) {
+    Preconditions.checkArgument(hiveTable != null, "HiveTable cannot be null");
+    String dbName = hiveTable.databaseName();
+    Preconditions.checkArgument(dbName != null, "Database name cannot be 
null");
+
+    org.apache.hadoop.hive.metastore.api.Table table =
+        new org.apache.hadoop.hive.metastore.api.Table();
+
+    table.setTableName(hiveTable.name());
+    table.setDbName(dbName);
+    String tableType =
+        hiveTable.properties().getOrDefault(TABLE_TYPE, 
String.valueOf(TableType.MANAGED_TABLE));
+    table.setTableType(tableType.toUpperCase());
+
+    List<FieldSchema> partitionFields =
+        hiveTable.partitionFieldNames().stream()
+            .map(fieldName -> buildPartitionKeyField(fieldName, hiveTable))
+            .collect(Collectors.toList());
+    table.setSd(buildStorageDescriptor(hiveTable, partitionFields));
+    table.setParameters(buildTableParameters(hiveTable));
+    table.setPartitionKeys(partitionFields);
+
+    // Set AuditInfo to Hive's Table object. Hive's Table doesn't support 
setting last modifier
+    // and last modified time, so we only set creator and create time.
+    table.setOwner(hiveTable.auditInfo().creator());
+    
table.setCreateTime(Math.toIntExact(hiveTable.auditInfo().createTime().getEpochSecond()));
+
+    return table;
+  }
+
+  private static FieldSchema buildPartitionKeyField(String fieldName, 
HiveTable table) {
+    Column partitionColumn =
+        Arrays.stream(table.columns())
+            .filter(c -> c.name().equals(fieldName))
+            .findFirst()
+            .orElseThrow(
+                () ->
+                    new IllegalArgumentException(
+                        String.format("Partition column %s does not exist", 
fieldName)));
+    return new FieldSchema(
+        partitionColumn.name(),
+        HiveDataTypeConverter.CONVERTER
+            .fromGravitino(partitionColumn.dataType())
+            .getQualifiedName(),
+        partitionColumn.comment());
+  }
+
+  private static StorageDescriptor buildStorageDescriptor(
+      HiveTable table, List<FieldSchema> partitionFields) {
+    StorageDescriptor strgDesc = new StorageDescriptor();
+    List<String> partitionKeys =
+        
partitionFields.stream().map(FieldSchema::getName).collect(Collectors.toList());
+    strgDesc.setCols(
+        Arrays.stream(table.columns())
+            .filter(c -> !partitionKeys.contains(c.name()))
+            .map(
+                c ->
+                    new FieldSchema(
+                        c.name(),
+                        HiveDataTypeConverter.CONVERTER
+                            .fromGravitino(c.dataType())
+                            .getQualifiedName(),
+                        c.comment()))
+            .collect(Collectors.toList()));
+
+    // `location` must not be null, otherwise it will result in an NPE when 
calling HMS `alterTable`
+    // interface
+    
Optional.ofNullable(table.properties().get(LOCATION)).ifPresent(strgDesc::setLocation);
+
+    strgDesc.setSerdeInfo(buildSerDeInfo(table));
+    StorageFormat storageFormat =
+        StorageFormat.valueOf(
+            table
+                .properties()
+                .getOrDefault(FORMAT, String.valueOf(StorageFormat.TEXTFILE))
+                .toUpperCase());
+    strgDesc.setInputFormat(storageFormat.getInputFormat());
+    strgDesc.setOutputFormat(storageFormat.getOutputFormat());
+    // Individually specified INPUT_FORMAT and OUTPUT_FORMAT can override the 
inputFormat and
+    // outputFormat of FORMAT
+    
Optional.ofNullable(table.properties().get(INPUT_FORMAT)).ifPresent(strgDesc::setInputFormat);
+    
Optional.ofNullable(table.properties().get(OUTPUT_FORMAT)).ifPresent(strgDesc::setOutputFormat);
+
+    if (table.sortOrder() != null && table.sortOrder().length > 0) {
+      for (SortOrder sortOrder : table.sortOrder()) {
+        String columnName = ((NamedReference.FieldReference) 
sortOrder.expression()).fieldName()[0];
+        strgDesc.addToSortCols(
+            new Order(columnName, sortOrder.direction() == 
SortDirection.ASCENDING ? 1 : 0));
+      }
+    }
+
+    if (table.distribution() != null && 
!Distributions.NONE.equals(table.distribution())) {
+      strgDesc.setBucketCols(
+          Arrays.stream(table.distribution().expressions())
+              .map(t -> ((NamedReference.FieldReference) t).fieldName()[0])
+              .collect(Collectors.toList()));
+      strgDesc.setNumBuckets(table.distribution().number());
+    }
+
+    return strgDesc;
+  }
+
+  private static SerDeInfo buildSerDeInfo(HiveTable table) {
+    SerDeInfo serDeInfo = new SerDeInfo();
+    serDeInfo.setName(table.properties().getOrDefault(SERDE_NAME, 
table.name()));
+
+    StorageFormat storageFormat =
+        StorageFormat.valueOf(
+            table
+                .properties()
+                .getOrDefault(FORMAT, String.valueOf(StorageFormat.TEXTFILE))
+                .toUpperCase());
+    serDeInfo.setSerializationLib(storageFormat.getSerde());
+    // Individually specified SERDE_LIB can override the serdeLib of FORMAT
+    Optional.ofNullable(table.properties().get(SERDE_LIB))
+        .ifPresent(serDeInfo::setSerializationLib);
+
+    table.properties().entrySet().stream()
+        .filter(e -> e.getKey().startsWith(SERDE_PARAMETER_PREFIX))
+        .forEach(
+            e ->
+                serDeInfo.putToParameters(
+                    e.getKey().substring(SERDE_PARAMETER_PREFIX.length()), 
e.getValue()));
+    return serDeInfo;
+  }
+
+  private static Map<String, String> buildTableParameters(HiveTable table) {
+    Map<String, String> parameters = Maps.newHashMap(table.properties());
+    Optional.ofNullable(table.comment()).ifPresent(c -> 
parameters.put(COMMENT, c));
+
+    if 
(TableType.EXTERNAL_TABLE.name().equalsIgnoreCase(table.properties().get(TABLE_TYPE)))
 {
+      parameters.put(EXTERNAL, "TRUE");
+    } else {
+      parameters.put(EXTERNAL, "FALSE");
+    }
+
+    parameters.remove(LOCATION);
+    parameters.remove(TABLE_TYPE);
+    parameters.remove(INPUT_FORMAT);
+    parameters.remove(OUTPUT_FORMAT);
+    parameters.remove(SERDE_NAME);
+    parameters.remove(SERDE_LIB);
+    parameters.remove(FORMAT);
+    parameters.keySet().removeIf(k -> k.startsWith(SERDE_PARAMETER_PREFIX));
+    return parameters;
+  }
+
+  public static AuditInfo 
getAuditInfo(org.apache.hadoop.hive.metastore.api.Table table) {
     // Get audit info from Hive's Table object. Because Hive's table doesn't 
store last modifier
     // and last modified time, we only get creator and create time from Hive's 
table.
     AuditInfo.Builder auditInfoBuilder = AuditInfo.builder();
@@ -52,7 +279,7 @@ public class HiveTableConverter {
     return auditInfoBuilder.build();
   }
 
-  public static Distribution getDistribution(Table table) {
+  public static Distribution 
getDistribution(org.apache.hadoop.hive.metastore.api.Table table) {
     StorageDescriptor sd = table.getSd();
     Distribution distribution = Distributions.NONE;
     if (sd.getBucketCols() != null && !sd.getBucketCols().isEmpty()) {
@@ -65,7 +292,7 @@ public class HiveTableConverter {
     return distribution;
   }
 
-  public static SortOrder[] getSortOrders(Table table) {
+  public static SortOrder[] 
getSortOrders(org.apache.hadoop.hive.metastore.api.Table table) {
     SortOrder[] sortOrders = SortOrders.NONE;
     StorageDescriptor sd = table.getSd();
     if (sd.getSortCols() != null && !sd.getSortCols().isEmpty()) {
@@ -81,15 +308,13 @@ public class HiveTableConverter {
     return sortOrders;
   }
 
-  public static Transform[] getPartitioning(Table table) {
+  public static Transform[] 
getPartitioning(org.apache.hadoop.hive.metastore.api.Table table) {
     return table.getPartitionKeys().stream()
         .map(p -> identity(p.getName()))
         .toArray(Transform[]::new);
   }
 
-  public static <
-          BUILDER extends BaseColumn.BaseColumnBuilder<BUILDER, COLUMN>, 
COLUMN extends BaseColumn>
-      Column[] getColumns(Table table, BUILDER columnBuilder) {
+  public static Column[] getColumns(org.apache.hadoop.hive.metastore.api.Table 
table) {
     StorageDescriptor sd = table.getSd();
     // Collect column names from sd.getCols() to check for duplicates
     Set<String> columnNames =
@@ -99,21 +324,65 @@ public class HiveTableConverter {
             sd.getCols().stream()
                 .map(
                     f ->
-                        columnBuilder
-                            .withName(f.getName())
-                            
.withType(HiveDataTypeConverter.CONVERTER.toGravitino(f.getType()))
-                            .withComment(f.getComment())
-                            .build()),
+                        buildColumn(
+                            f.getName(),
+                            
HiveDataTypeConverter.CONVERTER.toGravitino(f.getType()),
+                            f.getComment())),
             table.getPartitionKeys().stream()
                 // Filter out partition keys that already exist in sd.getCols()
                 .filter(p -> !columnNames.contains(p.getName()))
                 .map(
                     p ->
-                        columnBuilder
-                            .withName(p.getName())
-                            
.withType(HiveDataTypeConverter.CONVERTER.toGravitino(p.getType()))
-                            .withComment(p.getComment())
-                            .build()))
+                        buildColumn(
+                            p.getName(),
+                            
HiveDataTypeConverter.CONVERTER.toGravitino(p.getType()),
+                            p.getComment())))
         .toArray(Column[]::new);
   }
+
+  private static Column buildColumn(String name, Type type, String comment) {
+    HiveColumn.Builder builder =
+        HiveColumn.builder().withName(name).withType(type).withNullable(true);
+    if (comment != null) {
+      builder.withComment(comment);
+    }
+    return builder.build();
+  }
+
+  public static HivePartition fromHivePartition(
+      HiveTable table, org.apache.hadoop.hive.metastore.api.Partition 
partition) {
+    Preconditions.checkArgument(table != null, "Table cannot be null");
+    Preconditions.checkArgument(partition != null, "Partition cannot be null");
+    List<String> partitionColumns = table.partitionFieldNames();
+    String partitionName = FileUtils.makePartName(partitionColumns, 
partition.getValues());
+    // todo: support partition properties metadata to get more necessary 
information
+    return HivePartition.identity(partitionName, partition.getParameters());
+  }
+
+  public static org.apache.hadoop.hive.metastore.api.Partition toHivePartition(
+      String dbName, HiveTable table, HivePartition partition) {
+    Preconditions.checkArgument(dbName != null, "Database name cannot be 
null");
+    Preconditions.checkArgument(table != null, "Table cannot be null");
+    Preconditions.checkArgument(partition != null, "Partition cannot be null");
+    org.apache.hadoop.hive.metastore.api.Partition hivePartition =
+        new org.apache.hadoop.hive.metastore.api.Partition();
+    hivePartition.setDbName(dbName);
+    hivePartition.setTableName(table.name());
+
+    List<FieldSchema> partitionFields =
+        table.partitionFieldNames().stream()
+            .map(fieldName -> buildPartitionKeyField(fieldName, table))
+            .collect(Collectors.toList());
+    // todo: support custom serde and location if necessary
+    StorageDescriptor sd = buildStorageDescriptor(table, partitionFields);
+    // The location will be automatically generated by Hive Metastore
+    sd.setLocation(null);
+
+    hivePartition.setSd(sd);
+    hivePartition.setParameters(partition.properties());
+
+    List<String> values = 
HivePartition.extractPartitionValues(partition.name());
+    hivePartition.setValues(values);
+    return hivePartition;
+  }
 }
diff --git 
a/catalogs/hive-metastore-common/src/test/java/org/apache/gravitino/hive/converter/TestHiveTableConverter.java
 
b/catalogs/hive-metastore-common/src/test/java/org/apache/gravitino/hive/converter/TestHiveTableConverter.java
index f0882e35d9..3b9945807b 100644
--- 
a/catalogs/hive-metastore-common/src/test/java/org/apache/gravitino/hive/converter/TestHiveTableConverter.java
+++ 
b/catalogs/hive-metastore-common/src/test/java/org/apache/gravitino/hive/converter/TestHiveTableConverter.java
@@ -22,7 +22,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import java.util.Arrays;
 import java.util.List;
-import org.apache.gravitino.connector.BaseColumn;
 import org.apache.gravitino.rel.Column;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -53,9 +52,7 @@ public class TestHiveTableConverter {
             new FieldSchema("month", "int", "Month partition"));
     table.setPartitionKeys(partitionKeys);
 
-    // Get columns using a test column builder
-    TestColumnBuilder builder = new TestColumnBuilder();
-    Column[] columns = HiveTableConverter.getColumns(table, builder);
+    Column[] columns = HiveTableConverter.getColumns(table);
 
     // Should have all 5 columns (3 regular + 2 partition)
     assertEquals(5, columns.length);
@@ -90,9 +87,7 @@ public class TestHiveTableConverter {
             );
     table.setPartitionKeys(partitionKeys);
 
-    // Get columns using a test column builder
-    TestColumnBuilder builder = new TestColumnBuilder();
-    Column[] columns = HiveTableConverter.getColumns(table, builder);
+    Column[] columns = HiveTableConverter.getColumns(table);
 
     // Should have only 4 columns (3 regular + 1 unique partition)
     // The duplicates (date and name) should not be added again
@@ -126,9 +121,7 @@ public class TestHiveTableConverter {
             new FieldSchema("col3", "double", "Column 3 partition"));
     table.setPartitionKeys(partitionKeys);
 
-    // Get columns using a test column builder
-    TestColumnBuilder builder = new TestColumnBuilder();
-    Column[] columns = HiveTableConverter.getColumns(table, builder);
+    Column[] columns = HiveTableConverter.getColumns(table);
 
     // Should have only 3 columns (all from regular columns, no duplicates 
from partition keys)
     assertEquals(3, columns.length);
@@ -154,59 +147,11 @@ public class TestHiveTableConverter {
     // Empty partition keys
     table.setPartitionKeys(Arrays.asList());
 
-    // Get columns using a test column builder
-    TestColumnBuilder builder = new TestColumnBuilder();
-    Column[] columns = HiveTableConverter.getColumns(table, builder);
+    Column[] columns = HiveTableConverter.getColumns(table);
 
     // Should have only 2 columns from regular columns
     assertEquals(2, columns.length);
     assertEquals("id", columns[0].name());
     assertEquals("name", columns[1].name());
   }
-
-  // Test column implementation for testing
-  static class TestColumn extends BaseColumn {
-    private TestColumn() {}
-
-    static class Builder extends BaseColumn.BaseColumnBuilder<Builder, 
TestColumn> {
-      @Override
-      protected TestColumn internalBuild() {
-        TestColumn column = new TestColumn();
-        // Use reflection to set protected fields
-        try {
-          java.lang.reflect.Field nameField = 
BaseColumn.class.getDeclaredField("name");
-          nameField.setAccessible(true);
-          nameField.set(column, name);
-
-          java.lang.reflect.Field commentField = 
BaseColumn.class.getDeclaredField("comment");
-          commentField.setAccessible(true);
-          commentField.set(column, comment);
-
-          java.lang.reflect.Field dataTypeField = 
BaseColumn.class.getDeclaredField("dataType");
-          dataTypeField.setAccessible(true);
-          dataTypeField.set(column, dataType);
-
-          java.lang.reflect.Field nullableField = 
BaseColumn.class.getDeclaredField("nullable");
-          nullableField.setAccessible(true);
-          nullableField.set(column, nullable);
-
-          java.lang.reflect.Field autoIncrementField =
-              BaseColumn.class.getDeclaredField("autoIncrement");
-          autoIncrementField.setAccessible(true);
-          autoIncrementField.set(column, autoIncrement);
-
-          java.lang.reflect.Field defaultValueField =
-              BaseColumn.class.getDeclaredField("defaultValue");
-          defaultValueField.setAccessible(true);
-          defaultValueField.set(column, defaultValue);
-        } catch (Exception e) {
-          throw new RuntimeException("Failed to build TestColumn", e);
-        }
-        return column;
-      }
-    }
-  }
-
-  // Test column builder helper
-  static class TestColumnBuilder extends TestColumn.Builder {}
 }

Reply via email to