This is an automated email from the ASF dual-hosted git repository.

yuqi4733 pushed a commit to branch internal-main
in repository https://gitbox.apache.org/repos/asf/gravitino.git

commit a560c0ae83558522214aa24d43997e61e17fec9d
Author: Yuhui <[email protected]>
AuthorDate: Wed Dec 10 16:25:36 2025 +0800

    [5912] feat(catalog-hive): Implement the Hive shim layer to support Hive2/3 
 (#9416)
    
    ### What changes were proposed in this pull request?
    
    Implement the Hive shim layer to support Hive2/3
    
    ### Why are the changes needed?
    
    Fix: #5912
    
    ### Does this PR introduce _any_ user-facing change?
    
    NO
    
    ### How was this patch tested?
    
    Add UTs
---
 .../exceptions/GravitinoRuntimeException.java      |  10 +
 build.gradle.kts                                   |  12 +-
 .../gravitino/catalog/hive/HiveConstants.java      |   6 +
 catalogs/hive-metastore-common/build.gradle.kts    |  49 ++-
 .../org/apache/gravitino/hive/HivePartition.java   | 193 ++++++++++
 .../java/org/apache/gravitino/hive/HiveSchema.java |  80 ++++
 .../java/org/apache/gravitino/hive/HiveTable.java  | 132 +++++++
 .../apache/gravitino/hive/client/HiveClient.java   |  91 +++++
 .../hive/client/HiveClientClassLoader.java         | 245 ++++++++++++
 .../gravitino/hive/client/HiveClientFactory.java   | 196 ++++++++++
 .../gravitino/hive/client/HiveClientImpl.java      | 135 +++++++
 .../hive/client/HiveExceptionConverter.java        |  94 +++++
 .../org/apache/gravitino/hive/client/HiveShim.java | 118 ++++++
 .../gravitino/hive/client/ProxyHiveClientImpl.java |  29 ++
 .../org/apache/gravitino/hive/client/Util.java     |  49 +++
 .../gravitino/hive/client/TestHiveClient.java      | 419 +++++++++++++++++++++
 catalogs/hive-metastore2-libs/build.gradle.kts     |  50 +++
 catalogs/hive-metastore3-libs/build.gradle.kts     |  50 +++
 gradle/libs.versions.toml                          |   3 +
 settings.gradle.kts                                |   1 +
 20 files changed, 1945 insertions(+), 17 deletions(-)

diff --git 
a/api/src/main/java/org/apache/gravitino/exceptions/GravitinoRuntimeException.java
 
b/api/src/main/java/org/apache/gravitino/exceptions/GravitinoRuntimeException.java
index b8a5f22805..59e9080d3e 100644
--- 
a/api/src/main/java/org/apache/gravitino/exceptions/GravitinoRuntimeException.java
+++ 
b/api/src/main/java/org/apache/gravitino/exceptions/GravitinoRuntimeException.java
@@ -55,4 +55,14 @@ public class GravitinoRuntimeException extends 
RuntimeException {
   public GravitinoRuntimeException(Throwable cause, @FormatString String 
message, Object... args) {
     super(String.format(message, args), cause);
   }
+
+  /**
+   * Constructs a new exception with the specified detail message and cause.
+   *
+   * @param cause the cause.
+   * @param message the detail message.
+   */
+  public GravitinoRuntimeException(Throwable cause, String message) {
+    super(message, cause);
+  }
 }
diff --git a/build.gradle.kts b/build.gradle.kts
index 49a617e4f7..5c21b39c98 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -293,6 +293,12 @@ subprojects {
     return@subprojects
   }
 
+  if (project.path == ":catalogs:hive-metastore2-libs" ||
+    project.path == ":catalogs:hive-metastore3-libs"
+  ) {
+    return@subprojects
+  }
+
   apply(plugin = "jacoco")
   apply(plugin = "maven-publish")
   apply(plugin = "java")
@@ -981,8 +987,8 @@ tasks {
         !it.name.startsWith("iceberg") &&
         !it.name.startsWith("lance") &&
         !it.name.startsWith("spark") &&
+        !it.name.startsWith("hive-metastore") &&
         it.name != "hadoop-common" &&
-        it.name != "hive-metastore-common" &&
         it.name != "integration-test" &&
         it.name != "trino-connector" &&
         it.parent?.name != "bundles" &&
@@ -1015,6 +1021,8 @@ tasks {
         !it.name.startsWith("integration-test") &&
         !it.name.startsWith("spark") &&
         !it.name.startsWith("trino-connector") &&
+        it.name != "hive-metastore2-libs" &&
+        it.name != "hive-metastore3-libs" &&
         it.name != "hive-metastore-common" &&
         it.name != "docs" &&
         it.name != "hadoop-common" &&
@@ -1046,6 +1054,8 @@ tasks {
       ":catalogs:catalog-lakehouse-iceberg:copyLibAndConfig",
       ":catalogs:catalog-lakehouse-paimon:copyLibAndConfig",
       ":catalogs:catalog-model:copyLibAndConfig",
+      ":catalogs:hive-metastore2-libs:copyLibs",
+      ":catalogs:hive-metastore3-libs:copyLibs",
       ":catalogs:catalog-lakehouse-generic:copyLibAndConfig"
     )
   }
diff --git 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/hive/HiveConstants.java
 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/hive/HiveConstants.java
index 8c88f8b207..d5424141b1 100644
--- 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/hive/HiveConstants.java
+++ 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/hive/HiveConstants.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.catalog.hive;
 public class HiveConstants {
   // Catalog properties
   public static final String METASTORE_URIS = "metastore.uris";
+  public static final String DEFAULT_CATALOG = "default.catalog";
   public static final String CLIENT_POOL_SIZE = "client.pool-size";
   public static final String CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS =
       "client.pool-cache.eviction-interval-ms";
@@ -45,4 +46,9 @@ public class HiveConstants {
   public static final String SERDE_LIB = "serde-lib";
   public static final String SERDE_PARAMETER_PREFIX = "serde.parameter.";
   public static final String TRANSIENT_LAST_DDL_TIME = "transient_lastDdlTime";
+
+  // Hive metastore constants
+  public static final String HIVE_METASTORE_URIS = "hive.metastore.uris";
+  public static final String HIVE_FILTER_FIELD_PARAMS = 
"hive_filter_field_params__";
+  public static final String HIVE_METASTORE_TOKEN_SIGNATURE = 
"hive.metastore.token.signature";
 }
diff --git a/catalogs/hive-metastore-common/build.gradle.kts 
b/catalogs/hive-metastore-common/build.gradle.kts
index 539c8291dd..bed4a3de95 100644
--- a/catalogs/hive-metastore-common/build.gradle.kts
+++ b/catalogs/hive-metastore-common/build.gradle.kts
@@ -31,13 +31,34 @@ dependencies {
   implementation(project(":catalogs:catalog-common")) {
     exclude("*")
   }
+  implementation(project(":common")) {
+    exclude("*")
+  }
   implementation(project(":core")) {
     exclude("*")
   }
+  implementation(project(":clients:client-java-runtime", configuration = 
"shadow"))
 
   implementation(libs.caffeine)
   implementation(libs.guava)
-  implementation(libs.hive2.metastore) {
+  implementation(libs.slf4j.api)
+
+  compileOnly(libs.hive2.metastore)
+  compileOnly(libs.immutables.value)
+  compileOnly(libs.lombok)
+
+  annotationProcessor(libs.immutables.value)
+  annotationProcessor(libs.lombok)
+
+  testImplementation(libs.bundles.log4j)
+  testImplementation(libs.commons.collections3)
+  testImplementation(libs.commons.configuration1)
+  testImplementation(libs.datanucleus.core)
+  testImplementation(libs.datanucleus.api.jdo)
+  testImplementation(libs.datanucleus.rdbms)
+  testImplementation(libs.datanucleus.jdo)
+  testImplementation(libs.derby)
+  testImplementation(libs.hive2.metastore) {
     exclude("ant")
     exclude("co.cask.tephra")
     exclude("com.github.joshelser")
@@ -62,23 +83,9 @@ dependencies {
     exclude("org.openjdk.jol")
     exclude("org.slf4j")
   }
-  implementation(libs.hadoop2.common) {
+  testImplementation(libs.hadoop2.common) {
     exclude("*")
   }
-  implementation(libs.slf4j.api)
-
-  compileOnly(libs.immutables.value)
-
-  annotationProcessor(libs.immutables.value)
-
-  testImplementation(libs.bundles.log4j)
-  testImplementation(libs.commons.collections3)
-  testImplementation(libs.commons.configuration1)
-  testImplementation(libs.datanucleus.core)
-  testImplementation(libs.datanucleus.api.jdo)
-  testImplementation(libs.datanucleus.rdbms)
-  testImplementation(libs.datanucleus.jdo)
-  testImplementation(libs.derby)
   testImplementation(libs.hadoop2.auth) {
     exclude("*")
   }
@@ -130,3 +137,13 @@ configurations {
 artifacts {
   add("testArtifacts", testJar)
 }
+
+// Ensure the shaded Hive metastore lib jars exist before compiling this 
module,
+// since compileOnly(project(":catalogs:hive-metastore{2,3}-libs")) puts those
+// jars on the compile classpath and they are produced by the copyDepends 
tasks.
+tasks.named<JavaCompile>("compileJava") {
+  dependsOn(
+    ":catalogs:hive-metastore2-libs:copyDepends",
+    ":catalogs:hive-metastore3-libs:copyDepends"
+  )
+}
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/HivePartition.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/HivePartition.java
new file mode 100644
index 0000000000..672ebc9a55
--- /dev/null
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/HivePartition.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hive;
+
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.exceptions.NoSuchPartitionException;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.partitions.IdentityPartition;
+
+/** Represents a Hive identity partition with helpers to build and parse 
partition specs. */
+public final class HivePartition implements IdentityPartition {
+
+  private static final String PARTITION_NAME_DELIMITER = "/";
+  private static final String PARTITION_VALUE_DELIMITER = "=";
+
+  private final String name;
+  private final String[][] fieldNames;
+  private final Literal<?>[] values;
+  private final Map<String, String> properties;
+
+  private HivePartition(
+      String name, String[][] fieldNames, Literal<?>[] values, Map<String, 
String> properties) {
+    Preconditions.checkArgument(fieldNames != null, "Partition field names 
must not be null");
+    Preconditions.checkArgument(values != null, "Partition values must not be 
null");
+    Preconditions.checkArgument(
+        fieldNames.length == values.length,
+        "Partition field names size %s must equal values size %s",
+        fieldNames.length,
+        values.length);
+    Arrays.stream(fieldNames)
+        .forEach(
+            fn ->
+                Preconditions.checkArgument(
+                    fn.length == 1, "Hive catalog does not support nested 
partition field names"));
+
+    this.fieldNames = fieldNames;
+    this.values = values;
+    this.properties = properties;
+    this.name = StringUtils.isNotEmpty(name) ? name : buildPartitionName();
+    Preconditions.checkArgument(
+        StringUtils.isNotEmpty(this.name), "Partition name must not be null or 
empty");
+  }
+
+  public static HivePartition identity(String[][] fieldNames, Literal<?>[] 
values) {
+    return identity(fieldNames, values, Collections.emptyMap());
+  }
+
+  public static HivePartition identity(
+      String[][] fieldNames, Literal<?>[] values, Map<String, String> 
properties) {
+    return new HivePartition(null, fieldNames, values, properties);
+  }
+
+  public static HivePartition identity(String partitionName) {
+    return identity(partitionName, Collections.emptyMap());
+  }
+
+  public static HivePartition identity(String partitionName, Map<String, 
String> properties) {
+    Preconditions.checkArgument(
+        StringUtils.isNotEmpty(partitionName), "Partition name must not be 
null or empty");
+    String[][] fieldNames = extractPartitionFieldNames(partitionName);
+    Literal<?>[] values =
+        extractPartitionValues(partitionName).stream()
+            .map(Literals::stringLiteral)
+            .toArray(Literal[]::new);
+    return new HivePartition(partitionName, fieldNames, values, properties);
+  }
+
+  @Override
+  public String name() {
+    return name;
+  }
+
+  public String[][] fieldNames() {
+    return fieldNames;
+  }
+
+  public Literal<?>[] values() {
+    return values;
+  }
+
+  @Override
+  public Map<String, String> properties() {
+    return properties;
+  }
+
+  private String buildPartitionName() {
+    return IntStream.range(0, fieldNames.length)
+        .mapToObj(idx -> fieldNames[idx][0] + PARTITION_VALUE_DELIMITER + 
values[idx].value())
+        .collect(Collectors.joining(PARTITION_NAME_DELIMITER));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof HivePartition)) {
+      return false;
+    }
+    HivePartition that = (HivePartition) o;
+    return Objects.equals(name, that.name)
+        && Arrays.deepEquals(fieldNames, that.fieldNames)
+        && Arrays.equals(values, that.values)
+        && Objects.equals(properties, that.properties);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = Objects.hash(name, properties);
+    result = 31 * result + Arrays.deepHashCode(fieldNames);
+    result = 31 * result + Arrays.hashCode(values);
+    return result;
+  }
+
+  public static List<String> extractPartitionValues(String partitionName) {
+    if (StringUtils.isEmpty(partitionName)) {
+      return Collections.emptyList();
+    }
+    return Arrays.stream(partitionName.split(PARTITION_NAME_DELIMITER))
+        .map(
+            field -> {
+              String[] kv = field.split(PARTITION_VALUE_DELIMITER, 2);
+              return kv.length > 1 ? kv[1] : "";
+            })
+        .collect(Collectors.toList());
+  }
+
+  public static List<String> extractPartitionValues(
+      List<String> partitionFieldNames, String partitionSpec) {
+    Preconditions.checkArgument(
+        partitionFieldNames != null, "Partition field names must not be null");
+    if (partitionFieldNames.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    Map<String, String> partSpecMap = new HashMap<>();
+    if (StringUtils.isNotEmpty(partitionSpec)) {
+      Arrays.stream(partitionSpec.split(PARTITION_NAME_DELIMITER))
+          .forEach(
+              part -> {
+                String[] keyValue = part.split(PARTITION_VALUE_DELIMITER, 2);
+                if (keyValue.length != 2) {
+                  throw new IllegalArgumentException(
+                      String.format("Invalid partition format: %s", 
partitionSpec));
+                }
+                if (!partitionFieldNames.contains(keyValue[0])) {
+                  throw new NoSuchPartitionException(
+                      "Hive partition %s does not exist in Hive Metastore", 
partitionSpec);
+                }
+                partSpecMap.put(keyValue[0], keyValue[1]);
+              });
+    }
+
+    return partitionFieldNames.stream()
+        .map(key -> partSpecMap.getOrDefault(key, ""))
+        .collect(Collectors.toList());
+  }
+
+  public static String[][] extractPartitionFieldNames(String partitionName) {
+    if (StringUtils.isEmpty(partitionName)) {
+      return new String[0][0];
+    }
+    return Arrays.stream(partitionName.split(PARTITION_NAME_DELIMITER))
+        .map(part -> new String[] {part.split(PARTITION_VALUE_DELIMITER, 
2)[0]})
+        .toArray(String[][]::new);
+  }
+}
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/HiveSchema.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/HiveSchema.java
new file mode 100644
index 0000000000..eb7b3a2bcf
--- /dev/null
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/HiveSchema.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hive;
+
+import lombok.ToString;
+import org.apache.gravitino.connector.BaseSchema;
+
+/** Represents an Apache Hive Schema (Database) entity in the Hive Metastore 
catalog. */
+@ToString
+public class HiveSchema extends BaseSchema {
+  private String catalogName;
+
+  protected HiveSchema() {}
+
+  public String catalogName() {
+    return catalogName;
+  }
+
+  /** A builder class for constructing HiveSchema instances. */
+  public static class Builder extends BaseSchemaBuilder<Builder, HiveSchema> {
+
+    private String catalogName;
+
+    /** Creates a new instance of {@link Builder}. */
+    private Builder() {}
+
+    /**
+     * Sets the catalog name of the HiveSchema.
+     *
+     * @param catalogName The catalog name of the HiveSchema.
+     * @return The builder instance.
+     */
+    public Builder withCatalogName(String catalogName) {
+      this.catalogName = catalogName;
+      return this;
+    }
+
+    /**
+     * Internal method to build a HiveSchema instance using the provided 
values.
+     *
+     * @return A new HiveSchema instance with the configured values.
+     */
+    @Override
+    protected HiveSchema internalBuild() {
+      HiveSchema hiveSchema = new HiveSchema();
+      hiveSchema.name = name;
+      hiveSchema.comment = comment;
+      hiveSchema.properties = properties;
+      hiveSchema.auditInfo = auditInfo;
+      hiveSchema.catalogName = catalogName;
+
+      return hiveSchema;
+    }
+  }
+
+  /**
+   * Creates a new instance of {@link Builder}.
+   *
+   * @return The new instance.
+   */
+  public static Builder builder() {
+    return new Builder();
+  }
+}
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/HiveTable.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/HiveTable.java
new file mode 100644
index 0000000000..59ec54bfc4
--- /dev/null
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/HiveTable.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hive;
+
+import static org.apache.gravitino.catalog.hive.HiveConstants.COMMENT;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.Optional;
+import java.util.Set;
+import lombok.ToString;
+import org.apache.gravitino.catalog.hive.TableType;
+import org.apache.gravitino.connector.BaseTable;
+import org.apache.gravitino.connector.ProxyPlugin;
+import org.apache.gravitino.connector.TableOperations;
+
+/** Represents an Apache Hive Table entity in the Hive Metastore catalog. */
+@ToString
+public class HiveTable extends BaseTable {
+
+  // A set of supported Hive table types.
+  public static final Set<String> SUPPORT_TABLE_TYPES =
+      Sets.newHashSet(TableType.MANAGED_TABLE.name(), 
TableType.EXTERNAL_TABLE.name());
+  public static final String ICEBERG_TABLE_TYPE_VALUE = "ICEBERG";
+  public static final String TABLE_TYPE_PROP = "table_type";
+  private String catalogName;
+  private String databaseName;
+
+  protected HiveTable() {}
+
+  public String catalogName() {
+    return catalogName;
+  }
+
+  public String databaseName() {
+    return databaseName;
+  }
+
+  public void setProxyPlugin(ProxyPlugin plugin) {
+    this.proxyPlugin = Optional.ofNullable(plugin);
+  }
+
+  @Override
+  protected TableOperations newOps() throws UnsupportedOperationException {
+    throw new UnsupportedOperationException();
+  }
+
+  /** A builder class for constructing HiveTable instances. */
+  public static class Builder extends BaseTableBuilder<Builder, HiveTable> {
+
+    private String catalogName;
+    private String databaseName;
+
+    /** Creates a new instance of {@link Builder}. */
+    private Builder() {}
+
+    /**
+     * Sets the catalog name of the HiveTable.
+     *
+     * @param catalogName The catalog name of the HiveTable.
+     * @return This Builder instance.
+     */
+    public Builder withCatalogName(String catalogName) {
+      this.catalogName = catalogName;
+      return this;
+    }
+
+    /**
+     * Sets the Hive schema (database) name to be used for building the 
HiveTable.
+     *
+     * @param databaseName The string database name of the HiveTable.
+     * @return This Builder instance.
+     */
+    public Builder withDatabaseName(String databaseName) {
+      this.databaseName = databaseName;
+      return this;
+    }
+
+    /**
+     * Internal method to build a HiveTable instance using the provided values.
+     *
+     * @return A new HiveTable instance with the configured values.
+     */
+    @Override
+    protected HiveTable internalBuild() {
+      HiveTable hiveTable = new HiveTable();
+      hiveTable.name = name;
+      hiveTable.comment = comment;
+      hiveTable.properties = properties != null ? Maps.newHashMap(properties) 
: Maps.newHashMap();
+      hiveTable.auditInfo = auditInfo;
+      hiveTable.columns = columns;
+      hiveTable.distribution = distribution;
+      hiveTable.sortOrders = sortOrders;
+      hiveTable.partitioning = partitioning;
+      hiveTable.catalogName = catalogName;
+      hiveTable.databaseName = databaseName;
+      hiveTable.proxyPlugin = proxyPlugin;
+
+      // HMS put table comment in parameters
+      if (comment != null) {
+        hiveTable.properties.put(COMMENT, comment);
+      }
+
+      return hiveTable;
+    }
+  }
+
+  /**
+   * Creates a new instance of {@link Builder}.
+   *
+   * @return The new instance.
+   */
+  public static Builder builder() {
+    return new Builder();
+  }
+}
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClient.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClient.java
new file mode 100644
index 0000000000..f7202049d6
--- /dev/null
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClient.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hive.client;
+
+import java.util.List;
+import org.apache.gravitino.hive.HivePartition;
+import org.apache.gravitino.hive.HiveSchema;
+import org.apache.gravitino.hive.HiveTable;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * An externally visible interface to the Hive client. This interface is 
shared across both the
+ * internal and external classloaders for a given version of Hive and thus 
must expose only shared
+ * classes.
+ */
+public interface HiveClient extends AutoCloseable {
+
+  void createDatabase(HiveSchema database);
+
+  HiveSchema getDatabase(String catalogName, String databaseName);
+
+  List<String> getAllDatabases(String catalogName);
+
+  void alterDatabase(String catalogName, String databaseName, HiveSchema 
database);
+
+  void dropDatabase(String catalogName, String databaseName, boolean cascade);
+
+  List<String> getAllTables(String catalogName, String databaseName);
+
+  List<String> listTableNamesByFilter(
+      String catalogName, String databaseName, String filter, short pageSize);
+
+  HiveTable getTable(String catalogName, String databaseName, String 
tableName);
+
+  void alterTable(
+      String catalogName, String databaseName, String tableName, HiveTable 
alteredHiveTable);
+
+  void dropTable(
+      String catalogName,
+      String databaseName,
+      String tableName,
+      boolean deleteData,
+      boolean ifPurge);
+
+  void createTable(HiveTable hiveTable);
+
+  List<String> listPartitionNames(HiveTable table, short pageSize);
+
+  List<HivePartition> listPartitions(HiveTable table, short pageSize);
+
+  List<HivePartition> listPartitions(
+      HiveTable table, List<String> filterPartitionValueList, short pageSize);
+
+  HivePartition getPartition(HiveTable table, String partitionName);
+
+  HivePartition addPartition(HiveTable table, HivePartition partition);
+
+  void dropPartition(
+      String catalogName,
+      String databaseName,
+      String tableName,
+      String partitionName,
+      boolean deleteData);
+
+  String getDelegationToken(String finalPrincipalName, String userName);
+
+  List<HiveTable> getTableObjectsByName(
+      String catalogName, String databaseName, List<String> allTables);
+
+  List<String> getCatalogs();
+
+  void close();
+
+  UserGroupInformation getUser();
+}
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientClassLoader.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientClassLoader.java
new file mode 100644
index 0000000000..0e965f1e52
--- /dev/null
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientClassLoader.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gravitino.hive.client;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Isolated client loader for Hive Metastore clients. This class creates an 
isolated classloader
+ * that loads Hive-specific classes from version-specific jar files while 
sharing common classes
+ * with the base classloader.
+ */
+public final class HiveClientClassLoader extends URLClassLoader {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveClientClassLoader.class);
+
+  public enum HiveVersion {
+    HIVE2,
+    HIVE3,
+  }
+
+  private final ClassLoader baseClassLoader;
+  private final HiveVersion version;
+
+  /**
+   * Constructs an HiveClientClassLoader.
+   *
+   * @param version The Hive version
+   * @param execJars List of jar file URLs to load
+   * @param baseClassLoader The base classloader for shared classes
+   */
+  private HiveClientClassLoader(
+      HiveVersion version, List<URL> execJars, ClassLoader baseClassLoader) {
+    super(version.toString(), execJars.toArray(new URL[0]), null);
+    Preconditions.checkArgument(version != null, "Hive version cannot be 
null");
+    Preconditions.checkArgument(
+        execJars != null && !execJars.isEmpty(), "Jar URLs cannot be null or 
empty");
+    Preconditions.checkArgument(baseClassLoader != null, "Base classloader 
cannot be null");
+
+    this.version = version;
+    this.baseClassLoader = baseClassLoader;
+  }
+
+  public HiveVersion getHiveVersion() {
+    return version;
+  }
+
+  /**
+   * Creates a new {@link HiveClientClassLoader} instance for the given 
version.
+   *
+   * <p>This method does not perform any caching. Callers are responsible for 
managing and
+   * optionally caching returned instances.
+   *
+   * @param hiveVersion The Hive version to create a loader for.
+   * @param baseLoader The parent classloader to delegate shared classes to.
+   * @return A new {@link HiveClientClassLoader} instance.
+   */
+  public static HiveClientClassLoader createLoader(HiveVersion hiveVersion, 
ClassLoader baseLoader)
+      throws IOException {
+    Path jarDir = getJarDirectory(hiveVersion);
+    if (!Files.exists(jarDir) || !Files.isDirectory(jarDir)) {
+      throw new IOException("Hive jar directory does not exist or is not a 
directory: " + jarDir);
+    }
+
+    List<URL> jars = loadJarUrls(jarDir);
+    if (jars.isEmpty()) {
+      throw new IOException("No jar files found in directory: " + jarDir);
+    }
+
+    return new HiveClientClassLoader(hiveVersion, jars, baseLoader);
+  }
+
+  /**
+   * Gets the jar directory path for the specified Hive version.
+   *
+   * @param version The Hive version
+   * @return The path to the jar directory
+   */
+  private static Path getJarDirectory(HiveVersion version) {
+    String gravitinoHome = System.getenv("GRAVITINO_HOME");
+    Preconditions.checkArgument(StringUtils.isNotEmpty(gravitinoHome), 
"GRAVITINO_HOME not set");
+    boolean testEnv = System.getenv("GRAVITINO_TEST") != null;
+
+    String libsDir = version == HiveVersion.HIVE2 ? "hive-metastore2-libs" : 
"hive-metastore3-libs";
+
+    Path jarDir;
+    if (testEnv) {
+      // In test, hive metastore client jars are under the build directory.
+      jarDir = Paths.get(gravitinoHome, "catalogs", libsDir, "build", "libs");
+    } else {
+      // In production, jars are placed under the hive catalog libs directory.
+      jarDir = Paths.get(gravitinoHome, "catalogs", "hive", "libs", libsDir);
+    }
+
+    if (!Files.exists(jarDir) || !Files.isDirectory(jarDir)) {
+      throw new GravitinoRuntimeException(
+          "Cannot find Hive jar directory for version %s in directory %s", 
version, jarDir);
+    }
+
+    return jarDir.toAbsolutePath();
+  }
+
+  /**
+   * Loads all jar file URLs from the specified directory.
+   *
+   * @param jarDir The directory containing jar files
+   * @return A list of jar file URLs
+   * @throws IOException If an I/O error occurs
+   */
+  private static List<URL> loadJarUrls(Path jarDir) throws IOException {
+    try (var stream = Files.list(jarDir)) {
+      return stream
+          .filter(p -> p.toString().endsWith(".jar"))
+          .map(
+              p -> {
+                try {
+                  return p.toUri().toURL();
+                } catch (Exception e) {
+                  throw new GravitinoRuntimeException(e, "Failed to convert 
path to URL: %s", p);
+                }
+              })
+          .collect(Collectors.toList());
+    } catch (IOException e) {
+      throw new IOException("Failed to list jar files in directory: " + 
jarDir.toString(), e);
+    }
+  }
+
+  @Override
+  protected Class<?> loadClass(String name, boolean resolve) throws 
ClassNotFoundException {
+    Class<?> loaded = findLoadedClass(name);
+    if (loaded != null) {
+      return loaded;
+    }
+    if (isBarrierClass(name)) {
+      return loadBarrierClass(name);
+    } else if (isSharedClass(name)) {
+      return loadSharedClass(name, resolve);
+    } else {
+      LOG.debug("Classloader {} loading isolated class {}", getName(), name);
+      return super.loadClass(name, resolve);
+    }
+  }
+
+  private Class<?> loadBarrierClass(String name) throws ClassNotFoundException 
{
+    LOG.debug("Classloader {} loading barrier class {}", getName(), name);
+    String classFileName = name.replace(".", "/") + ".class";
+    try (InputStream is = baseClassLoader.getResourceAsStream(classFileName)) {
+      if (is == null) {
+        throw new ClassNotFoundException("Cannot load barrier class: " + name);
+      }
+      byte[] bytes = is.readAllBytes();
+      return defineClass(name, bytes, 0, bytes.length);
+    } catch (IOException e) {
+      throw new ClassNotFoundException("Failed to load barrier class: " + 
name, e);
+    }
+  }
+
+  private Class<?> loadSharedClass(String name, boolean resolve) throws 
ClassNotFoundException {
+    LOG.debug("Classloader {} loading shared class {}", getName(), name);
+    try {
+      return baseClassLoader.loadClass(name);
+    } catch (ClassNotFoundException e) {
+      // Fallback to isolated classloader if not found in base
+      return super.loadClass(name, resolve);
+    }
+  }
+
+  /**
+   * Checks if a class should be shared with the base classloader.
+   *
+   * @param name The fully qualified class name
+   * @return true if the class should be shared, false otherwise
+   */
+  private boolean isSharedClass(String name) {
+    // Shared logging classes
+    if (name.startsWith("org.slf4j")
+        || name.startsWith("org.apache.log4j")
+        || name.startsWith("org.apache.logging.log4j")) {
+      return true;
+    }
+
+    // Shared Hadoop classes (excluding Hive-specific ones)
+    if (name.startsWith("org.apache.hadoop.") && 
!name.startsWith("org.apache.hadoop.hive.")) {
+      return true;
+    }
+
+    // Shared Google classes (excluding cloud-specific ones)
+    if (name.startsWith("com.google") && !name.startsWith("com.google.cloud")) 
{
+      return true;
+    }
+
+    // Java standard library classes
+    if (name.startsWith("java.")
+        || name.startsWith("javax.")
+        || name.startsWith("com.sun.")
+        || name.startsWith("org.ietf.jgss.")) {
+      return true;
+    }
+
+    // Gravitino classes
+    if (name.startsWith("org.apache.gravitino.")) {
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
+   * Checks if a class is a barrier class that should be loaded in isolation.
+   *
+   * @param name The fully qualified class name
+   * @return true if the class is a barrier class, false otherwise
+   */
+  private boolean isBarrierClass(String name) {
+    return name.startsWith(HiveClientImpl.class.getName())
+        || name.startsWith(HiveShim.class.getName())
+        || name.startsWith(Util.class.getName())
+        || name.startsWith("org.apache.gravitino.hive.converter.");
+  }
+}
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientFactory.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientFactory.java
new file mode 100644
index 0000000000..1c74ca908f
--- /dev/null
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientFactory.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.client;
+
+import static 
org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_URIS;
+import static 
org.apache.gravitino.hive.client.HiveClientClassLoader.HiveVersion.HIVE2;
+import static 
org.apache.gravitino.hive.client.HiveClientClassLoader.HiveVersion.HIVE3;
+import static 
org.apache.gravitino.hive.client.Util.buildConfigurationFromProperties;
+
+import com.google.common.base.Preconditions;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.util.Properties;
+import org.apache.commons.lang3.reflect.MethodUtils;
+import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class HiveClientFactory {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveClientFactory.class);
+
+  // Remember which Hive backend classloader worked successfully for this 
factory.
+  private volatile HiveClientClassLoader backendClassLoader;
+  private final Object classLoaderLock = new Object();
+
+  @SuppressWarnings("UnusedVariable")
+  private final Configuration hadoopConf;
+
+  private final Properties properties;
+
+  /**
+   * Creates a {@link HiveClientFactory} bound to the given configuration 
properties.
+   *
+   * @param properties Hive client configuration, must not be null.
+   * @param id An identifier for this factory instance.
+   */
+  public HiveClientFactory(Properties properties, String id) {
+    Preconditions.checkArgument(properties != null, "Properties cannot be 
null");
+    this.properties = properties;
+
+    try {
+      this.hadoopConf = buildConfigurationFromProperties(properties);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to initialize HiveClientFactory", e);
+    }
+  }
+
+  public HiveClient createHiveClient() {
+    HiveClientClassLoader classLoader;
+    if (backendClassLoader == null) {
+      synchronized (classLoaderLock) {
+        if (backendClassLoader == null) {
+          // initialize the backend classloader with try connecting to Hive 
metastore
+          return createHiveClientWithBackend();
+        }
+      }
+    }
+    classLoader = backendClassLoader;
+
+    HiveClient client;
+    try {
+      client = createHiveClientInternal(classLoader);
+      LOG.info(
+          "Connected to Hive Metastore using cached Hive version {}", 
classLoader.getHiveVersion());
+      return client;
+    } catch (Exception e) {
+      LOG.warn(
+          "Failed to connect to Hive Metastore using cached Hive version {}",
+          classLoader.getHiveVersion(),
+          e);
+      throw new RuntimeException("Failed to connect to Hive Metastore", e);
+    }
+  }
+
+  public HiveClient createHiveClientWithBackend() {
+    HiveClient client = null;
+    HiveClientClassLoader classloader = null;
+    try {
+      // Try using Hive3 first
+      classloader =
+          HiveClientClassLoader.createLoader(HIVE3, 
Thread.currentThread().getContextClassLoader());
+      client = createHiveClientInternal(classloader);
+      client.getCatalogs();
+      LOG.info("Connected to Hive Metastore using Hive version HIVE3");
+      backendClassLoader = classloader;
+      return client;
+
+    } catch (GravitinoRuntimeException e) {
+      try {
+        if (client != null) {
+          client.close();
+        }
+        if (classloader != null) {
+          classloader.close();
+        }
+
+        // Fallback to Hive2 if we can list databases
+        if (e.getMessage().contains("Invalid method name: 'get_catalogs'")
+            || e.getMessage().contains("class not found") // caused by 
MiniHiveMetastoreService
+        ) {
+          classloader =
+              HiveClientClassLoader.createLoader(
+                  HIVE2, Thread.currentThread().getContextClassLoader());
+          client = createHiveClientInternal(classloader);
+          LOG.info("Connected to Hive Metastore using Hive version HIVE2");
+          backendClassLoader = classloader;
+          return client;
+        }
+        throw e;
+
+      } catch (Exception ex) {
+        LOG.error("Failed to connect to Hive Metastore using both Hive3 and 
Hive2", ex);
+        throw e;
+      }
+    } catch (Exception e) {
+      throw HiveExceptionConverter.toGravitinoException(
+          e, HiveExceptionConverter.ExceptionTarget.other(""));
+    }
+  }
+
+  public static HiveClient createHiveClientImpl(
+      HiveClientClassLoader.HiveVersion version, Properties properties, 
ClassLoader classloader)
+      throws Exception {
+    Class<?> hiveClientImplClass = 
classloader.loadClass(HiveClientImpl.class.getName());
+    Constructor<?> hiveClientImplCtor =
+        hiveClientImplClass.getConstructor(
+            HiveClientClassLoader.HiveVersion.class, Properties.class);
+    return (HiveClient) hiveClientImplCtor.newInstance(version, properties);
+  }
+
+  public static HiveClient createProxyHiveClientImpl(
+      HiveClientClassLoader.HiveVersion version,
+      Properties properties,
+      UserGroupInformation ugi,
+      ClassLoader classloader)
+      throws Exception {
+    Class<?> hiveClientImplClass = 
classloader.loadClass(ProxyHiveClientImpl.class.getName());
+    Method createMethod =
+        MethodUtils.getAccessibleMethod(
+            hiveClientImplClass,
+            "createClient",
+            HiveClientClassLoader.HiveVersion.class,
+            UserGroupInformation.class,
+            Properties.class);
+    return (HiveClient) createMethod.invoke(null, version, ugi, properties);
+  }
+
+  private HiveClient createHiveClientInternal(HiveClientClassLoader 
classloader) {
+    ClassLoader origLoader = Thread.currentThread().getContextClassLoader();
+    Thread.currentThread().setContextClassLoader(classloader);
+    try {
+      return createHiveClientImpl(classloader.getHiveVersion(), properties, 
classloader);
+    } catch (Exception e) {
+      throw HiveExceptionConverter.toGravitinoException(
+          e,
+          HiveExceptionConverter.ExceptionTarget.other(
+              properties.getProperty(HIVE_METASTORE_URIS)));
+    } finally {
+      Thread.currentThread().setContextClassLoader(origLoader);
+    }
+  }
+
+  /** Release resources held by this factory. */
+  public void close() {
+    synchronized (classLoaderLock) {
+      try {
+        if (backendClassLoader != null) {
+          backendClassLoader.close();
+          backendClassLoader = null;
+        }
+
+      } catch (Exception e) {
+        LOG.warn("Failed to close HiveClientFactory", e);
+      }
+    }
+  }
+}
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientImpl.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientImpl.java
new file mode 100644
index 0000000000..310e6eb258
--- /dev/null
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientImpl.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hive.client;
+
+import java.util.List;
+import org.apache.gravitino.hive.HivePartition;
+import org.apache.gravitino.hive.HiveSchema;
+import org.apache.gravitino.hive.HiveTable;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Java version of HiveClientImpl from Spark Hive module. Provides full 
database, table, and
+ * partition operations.
+ */
+public class HiveClientImpl implements HiveClient {
+  @Override
+  public void createDatabase(HiveSchema database) {}
+
+  @Override
+  public HiveSchema getDatabase(String catalogName, String databaseName) {
+    return null;
+  }
+
+  @Override
+  public List<String> getAllDatabases(String catalogName) {
+    return List.of();
+  }
+
+  @Override
+  public void alterDatabase(String catalogName, String databaseName, 
HiveSchema database) {}
+
+  @Override
+  public void dropDatabase(String catalogName, String databaseName, boolean 
cascade) {}
+
+  @Override
+  public List<String> getAllTables(String catalogName, String databaseName) {
+    return List.of();
+  }
+
+  @Override
+  public List<String> listTableNamesByFilter(
+      String catalogName, String databaseName, String filter, short pageSize) {
+    return List.of();
+  }
+
+  @Override
+  public HiveTable getTable(String catalogName, String databaseName, String 
tableName) {
+    return null;
+  }
+
+  @Override
+  public void alterTable(
+      String catalogName, String databaseName, String tableName, HiveTable 
alteredHiveTable) {}
+
+  @Override
+  public void dropTable(
+      String catalogName,
+      String databaseName,
+      String tableName,
+      boolean deleteData,
+      boolean ifPurge) {}
+
+  @Override
+  public void createTable(HiveTable hiveTable) {}
+
+  @Override
+  public List<String> listPartitionNames(HiveTable table, short pageSize) {
+    return List.of();
+  }
+
+  @Override
+  public List<HivePartition> listPartitions(HiveTable table, short pageSize) {
+    return List.of();
+  }
+
+  @Override
+  public List<HivePartition> listPartitions(
+      HiveTable table, List<String> filterPartitionValueList, short pageSize) {
+    return List.of();
+  }
+
+  @Override
+  public HivePartition getPartition(HiveTable table, String partitionName) {
+    return null;
+  }
+
+  @Override
+  public HivePartition addPartition(HiveTable table, HivePartition partition) {
+    return null;
+  }
+
+  @Override
+  public void dropPartition(
+      String catalogName, String databaseName, String tableName, String 
partitionName, boolean b) {}
+
+  @Override
+  public String getDelegationToken(String finalPrincipalName, String userName) 
{
+    return "";
+  }
+
+  @Override
+  public List<HiveTable> getTableObjectsByName(
+      String catalogName, String databaseName, List<String> allTables) {
+    return List.of();
+  }
+
+  @Override
+  public List<String> getCatalogs() {
+    return List.of();
+  }
+
+  @Override
+  public void close() {}
+
+  @Override
+  public UserGroupInformation getUser() {
+    return null;
+  }
+}
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveExceptionConverter.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveExceptionConverter.java
new file mode 100644
index 0000000000..5e4c70885f
--- /dev/null
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveExceptionConverter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hive.client;
+
+/**
+ * Utility class to convert Hive exceptions to Gravitino exceptions. This 
class handles various
+ * types of exceptions that can be thrown by Hive Metastore operations, 
including:
+ *
+ * <ul>
+ *   <li>Reflection exceptions (InvocationTargetException)
+ *   <li>Hive Metastore exceptions (e.g., AlreadyExistsException, 
NoSuchObjectException,
+ *       InvalidOperationException, MetaException)
+ *   <li>Hive Thrift exceptions (TException)
+ *   <li>Other runtime exceptions
+ * </ul>
+ */
+public class HiveExceptionConverter {
+
+  enum TargetType {
+    TABLE,
+    SCHEMA,
+    PARTITION,
+    CATALOG,
+    OTHER
+  }
+
+  /** Represents the target Hive object (name + type) associated with an 
operation. */
+  public static final class ExceptionTarget {
+    private final String name;
+    private final TargetType type;
+
+    private ExceptionTarget(String name, TargetType type) {
+      this.name = name;
+      this.type = type;
+    }
+
+    public static ExceptionTarget table(String name) {
+      return new ExceptionTarget(name, TargetType.TABLE);
+    }
+
+    public static ExceptionTarget schema(String name) {
+      return new ExceptionTarget(name, TargetType.SCHEMA);
+    }
+
+    public static ExceptionTarget catalog(String name) {
+      return new ExceptionTarget(name, TargetType.CATALOG);
+    }
+
+    public static ExceptionTarget partition(String name) {
+      return new ExceptionTarget(name, TargetType.PARTITION);
+    }
+
+    public static ExceptionTarget other(String name) {
+      return new ExceptionTarget(name, TargetType.OTHER);
+    }
+
+    public String name() {
+      return name;
+    }
+
+    public TargetType type() {
+      return type;
+    }
+  }
+
+  private HiveExceptionConverter() {}
+
+  /**
+   * Converts a generic exception to a Gravitino exception with a target Hive 
object.
+   *
+   * @param e The exception to convert
+   * @param target The Hive object related to the operation (table, partition, 
schema, etc.)
+   * @return A Gravitino exception
+   */
+  public static RuntimeException toGravitinoException(Exception e, 
ExceptionTarget target) {
+    return null;
+  }
+}
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShim.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShim.java
new file mode 100644
index 0000000000..d780354ef9
--- /dev/null
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShim.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hive.client;
+
+import java.util.List;
+import java.util.Properties;
+import org.apache.gravitino.hive.HivePartition;
+import org.apache.gravitino.hive.HiveSchema;
+import org.apache.gravitino.hive.HiveTable;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.thrift.TException;
+
+/**
+ * Java translation of Scala's `Shim` sealed abstract class.
+ *
+ * <p>This class declares the compatibility layer between Spark and different 
Hive versions.
+ * Concrete subclasses (e.g. HiveShimV2, HiveShimV3 ...) must implement these 
methods according to
+ * the behavior of the corresponding Hive release.
+ */
+public abstract class HiveShim {
+
+  protected static final String RETRYING_META_STORE_CLIENT_CLASS =
+      "org.apache.hadoop.hive.metastore.RetryingMetaStoreClient";
+  protected static final String IMETA_STORE_CLIENT_CLASS =
+      "org.apache.hadoop.hive.metastore.IMetaStoreClient";
+  protected static final String HIVE_CONF_CLASS = 
"org.apache.hadoop.hive.conf.HiveConf";
+  protected static final String CONFIGURATION_CLASS = 
"org.apache.hadoop.conf.Configuration";
+  protected static final String METHOD_GET_PROXY = "getProxy";
+
+  protected final IMetaStoreClient client;
+  protected final HiveClientClassLoader.HiveVersion version;
+
+  protected HiveShim(HiveClientClassLoader.HiveVersion version, Properties 
properties) {
+    this.client = createMetaStoreClient(properties);
+    this.version = version;
+  }
+
+  public abstract IMetaStoreClient createMetaStoreClient(Properties 
properties);
+
+  public List<String> getAllDatabases(String catalogName) {
+    try {
+      return client.getAllDatabases();
+    } catch (TException e) {
+      throw HiveExceptionConverter.toGravitinoException(
+          e, HiveExceptionConverter.ExceptionTarget.catalog(catalogName));
+    }
+  }
+
+  public abstract void createDatabase(HiveSchema database);
+
+  public abstract HiveSchema getDatabase(String catalogName, String 
databaseName);
+
+  public abstract void alterDatabase(String catalogName, String databaseName, 
HiveSchema database);
+
+  public abstract void dropDatabase(String catalogName, String databaseName, 
boolean cascade);
+
+  public abstract List<String> getAllTables(String catalogName, String 
databaseName);
+
+  public abstract List<String> listTableNamesByFilter(
+      String catalogName, String databaseName, String filter, short pageSize);
+
+  public abstract HiveTable getTable(String catalogName, String databaseName, 
String tableName);
+
+  public abstract void alterTable(
+      String catalogName, String databaseName, String tableName, HiveTable 
alteredHiveTable);
+
+  public abstract void dropTable(
+      String catalogName,
+      String databaseName,
+      String tableName,
+      boolean deleteData,
+      boolean ifPurge);
+
+  public abstract void createTable(HiveTable hiveTable);
+
+  public abstract List<String> listPartitionNames(HiveTable table, short 
pageSize);
+
+  public abstract List<HivePartition> listPartitions(HiveTable table, short 
pageSize);
+
+  public abstract List<HivePartition> listPartitions(
+      HiveTable table, List<String> filterPartitionValueList, short pageSize);
+
+  public abstract HivePartition getPartition(HiveTable table, String 
partitionName);
+
+  public abstract HivePartition addPartition(HiveTable table, HivePartition 
partition);
+
+  public abstract void dropPartition(
+      String catalogName, String databaseName, String tableName, String 
partitionName, boolean b);
+
+  public abstract String getDelegationToken(String finalPrincipalName, String 
userName);
+
+  public abstract List<HiveTable> getTableObjectsByName(
+      String catalogName, String databaseName, List<String> allTables);
+
+  public abstract List<String> getCatalogs();
+
+  public void close() throws Exception {
+    if (client != null) {
+      client.close();
+    }
+  }
+}
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/ProxyHiveClientImpl.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/ProxyHiveClientImpl.java
new file mode 100644
index 0000000000..c62c394a04
--- /dev/null
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/ProxyHiveClientImpl.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hive.client;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+
+public class ProxyHiveClientImpl implements InvocationHandler {
+  @Override
+  public Object invoke(Object o, Method method, Object[] objects) throws 
Throwable {
+    return null;
+  }
+}
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/Util.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/Util.java
new file mode 100644
index 0000000000..fd16977363
--- /dev/null
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/Util.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hive.client;
+
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+public class Util {
+
+  public static final String HIVE_CONFIG_RESOURCES = "hive.config.resources";
+
+  public static Configuration buildConfigurationFromProperties(Properties 
properties) {
+    try {
+      Configuration config = new Configuration();
+      String configResources = properties.getProperty(HIVE_CONFIG_RESOURCES);
+      if (StringUtils.isNotBlank(configResources)) {
+        for (String resource : configResources.split(",")) {
+          resource = resource.trim();
+          if (StringUtils.isNotBlank(resource)) {
+            config.addResource(new Path(resource));
+          }
+        }
+      }
+
+      properties.forEach((k, v) -> config.set(k.toString(), v.toString()));
+      return config;
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create configuration", e);
+    }
+  }
+}
diff --git 
a/catalogs/hive-metastore-common/src/test/java/org/apache/gravitino/hive/client/TestHiveClient.java
 
b/catalogs/hive-metastore-common/src/test/java/org/apache/gravitino/hive/client/TestHiveClient.java
new file mode 100644
index 0000000000..c238f33ce5
--- /dev/null
+++ 
b/catalogs/hive-metastore-common/src/test/java/org/apache/gravitino/hive/client/TestHiveClient.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.client;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import org.apache.gravitino.catalog.hive.HiveConstants;
+import org.apache.gravitino.exceptions.ConnectionFailedException;
+import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.apache.gravitino.exceptions.NoSuchPartitionException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.NonEmptySchemaException;
+import org.apache.gravitino.exceptions.PartitionAlreadyExistsException;
+import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.hive.HivePartition;
+import org.apache.gravitino.hive.HiveSchema;
+import org.apache.gravitino.hive.HiveTable;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+// This class is used for manual testing against real Hive Metastore instances.
+@Disabled
+public class TestHiveClient {
+
+  private static final String HIVE2_HMS_URL = "thrift://172.17.0.4:9083";
+  private static final String HIVE2_HDFS_URL = "hdfs://172.17.0.4:9000";
+  private static final String HIVE3_HMS_URL = "thrift://172.17.0.3:9083";
+  private static final String HIVE3_HDFS_URL = "hdfs://172.17.0.3:9000";
+
+  private static final String KERBEROS_HIVE2_HMS_URL = 
"thrift://172.17.0.2:9083";
+  private static final String KERBEROS_HIVE2_HDFS_URL = 
"hdfs://172.17.0.2:9000";
+  private static final String KERBEROS_PRINCIPAL = "cli@HADOOPKRB";
+  private static final String KERBEROS_KEYTAB = 
"/tmp/test4310082059861441407/client.keytab";
+  private static final String KERBEROS_METASTORE_PRINCIPAL = 
"hive/6b1955fcb754@HADOOPKRB";
+  private static final String KERBEROS_KRB5_CONF = 
"/tmp/test4310082059861441407/krb5.conf";
+
+  @Test
+  void testHive2Client() throws Exception {
+    runHiveClientTest("", "hive2", HIVE2_HMS_URL, HIVE2_HDFS_URL + 
"/tmp/gravitino_test");
+  }
+
+  @Test
+  void testHive3DefaultCatalog() throws Exception {
+    // Hive3 default catalog is "hive", not empty string
+    runHiveClientTest(
+        "hive", "hive3_default", HIVE3_HMS_URL, HIVE3_HDFS_URL + 
"/tmp/gravitino_test");
+  }
+
+  @Test
+  void testHive3SampleCatalog() throws Exception {
+    runHiveClientTest(
+        "sample_catalog", "hive3_sample", HIVE3_HMS_URL, HIVE3_HDFS_URL + 
"/tmp/gravitino_test");
+  }
+
+  private void runHiveClientTest(
+      String catalogName, String testPrefix, String metastoreUri, String 
hdfsBasePath) {
+    Properties properties = new Properties();
+    properties.setProperty("hive.metastore.uris", metastoreUri);
+    HiveClient client = new HiveClientFactory(properties, 
"").createHiveClient();
+
+    String dbName = "gt_" + testPrefix + "_db_" + 
UUID.randomUUID().toString().replace("-", "");
+    String tableName = "gt_" + testPrefix + "_tbl_" + 
UUID.randomUUID().toString().replace("-", "");
+    String partitionValue = "p_" + UUID.randomUUID().toString().replace("-", 
"");
+    String partitionName = "dt=" + partitionValue;
+
+    String dbLocation = hdfsBasePath + "/" + dbName;
+    String tableLocation = hdfsBasePath + "/" + tableName;
+
+    HiveSchema schema = createTestSchema(catalogName, dbName, dbLocation);
+    HiveTable table = createTestTable(catalogName, dbName, tableName, 
tableLocation);
+    HivePartition partition = createTestPartition(partitionName, 
partitionValue);
+
+    try {
+      // Test database operations
+      client.createDatabase(schema);
+      List<String> allDatabases = client.getAllDatabases(catalogName);
+      Assertions.assertTrue(allDatabases.contains(dbName), "Database should be 
in the list");
+
+      HiveSchema loadedDb = client.getDatabase(catalogName, dbName);
+      Assertions.assertNotNull(loadedDb, "Loaded database should not be null");
+      Assertions.assertEquals(dbName, loadedDb.name(), "Database name should 
match");
+      Assertions.assertEquals(
+          schema.comment(), loadedDb.comment(), "Database comment should 
match");
+
+      client.alterDatabase(catalogName, dbName, schema);
+      HiveSchema alteredDb = client.getDatabase(catalogName, dbName);
+      Assertions.assertNotNull(alteredDb, "Altered database should not be 
null");
+
+      // Test table operations
+      client.createTable(table);
+      List<String> allTables = client.getAllTables(catalogName, dbName);
+      Assertions.assertTrue(allTables.contains(tableName), "Table should be in 
the list");
+
+      HiveTable loadedTable = client.getTable(catalogName, dbName, tableName);
+      Assertions.assertNotNull(loadedTable, "Loaded table should not be null");
+      Assertions.assertEquals(tableName, loadedTable.name(), "Table name 
should match");
+      Assertions.assertEquals(table.comment(), loadedTable.comment(), "Table 
comment should match");
+      Assertions.assertEquals(2, loadedTable.columns().length, "Table should 
have 2 columns");
+      Assertions.assertEquals(
+          1, loadedTable.partitioning().length, "Table should have 1 partition 
key");
+
+      client.alterTable(catalogName, dbName, tableName, loadedTable);
+      HiveTable alteredTable = client.getTable(catalogName, dbName, tableName);
+      Assertions.assertNotNull(alteredTable, "Altered table should not be 
null");
+
+      List<String> filteredTables =
+          client.listTableNamesByFilter(catalogName, dbName, "", (short) 10);
+      Assertions.assertTrue(
+          filteredTables.contains(tableName), "Filtered tables should contain 
the table");
+
+      List<HiveTable> tableObjects =
+          client.getTableObjectsByName(catalogName, dbName, 
List.of(tableName));
+      Assertions.assertEquals(1, tableObjects.size(), "Should get exactly one 
table object");
+      Assertions.assertEquals(
+          tableName, tableObjects.get(0).name(), "Table object name should 
match");
+
+      // Test partition operations
+      HivePartition addedPartition = client.addPartition(loadedTable, 
partition);
+      Assertions.assertNotNull(addedPartition, "Added partition should not be 
null");
+      Assertions.assertEquals(partitionName, addedPartition.name(), "Partition 
name should match");
+
+      List<String> partitionNames = client.listPartitionNames(loadedTable, 
(short) 10);
+      Assertions.assertTrue(
+          partitionNames.contains(partitionName), "Partition should be in the 
list");
+
+      List<HivePartition> partitions = client.listPartitions(loadedTable, 
(short) 10);
+      Assertions.assertEquals(1, partitions.size(), "Should have exactly one 
partition");
+      Assertions.assertEquals(
+          partitionName, partitions.get(0).name(), "Partition name should 
match");
+
+      List<HivePartition> filteredPartitions =
+          client.listPartitions(loadedTable, List.of(partitionValue), (short) 
10);
+      Assertions.assertEquals(
+          1, filteredPartitions.size(), "Should have exactly one filtered 
partition");
+
+      HivePartition fetchedPartition = client.getPartition(loadedTable, 
addedPartition.name());
+      Assertions.assertNotNull(fetchedPartition, "Fetched partition should not 
be null");
+      Assertions.assertEquals(
+          partitionName, fetchedPartition.name(), "Fetched partition name 
should match");
+
+      client.dropPartition(catalogName, dbName, tableName, 
addedPartition.name(), true);
+      List<String> partitionNamesAfterDrop = 
client.listPartitionNames(loadedTable, (short) 10);
+      Assertions.assertFalse(
+          partitionNamesAfterDrop.contains(partitionName),
+          "Partition should not be in the list after drop");
+
+      // Test delegation token (may not be available in all environments)
+      try {
+        String token =
+            client.getDelegationToken(
+                System.getProperty("user.name"), 
System.getProperty("user.name"));
+        Assertions.assertNotNull(token, "Delegation token should not be null");
+      } catch (Exception e) {
+        // Delegation token may not be available, this is acceptable
+      }
+
+      // Cleanup
+      client.dropTable(catalogName, dbName, tableName, true, true);
+      List<String> tablesAfterDrop = client.getAllTables(catalogName, dbName);
+      Assertions.assertFalse(
+          tablesAfterDrop.contains(tableName), "Table should not be in the 
list after drop");
+
+      client.dropDatabase(catalogName, dbName, true);
+      List<String> databasesAfterDrop = client.getAllDatabases(catalogName);
+      Assertions.assertFalse(
+          databasesAfterDrop.contains(dbName), "Database should not be in the 
list after drop");
+    } finally {
+      safelyDropTable(client, catalogName, dbName, tableName);
+      safelyDropDatabase(client, catalogName, dbName);
+    }
+  }
+
+  private HiveSchema createTestSchema(String catalogName, String dbName, 
String location) {
+    Map<String, String> properties = new HashMap<>();
+    properties.put(HiveConstants.LOCATION, location);
+    return HiveSchema.builder()
+        .withName(dbName)
+        .withComment("Test schema for HiveClient operations")
+        .withProperties(properties)
+        .withAuditInfo(defaultAudit())
+        .withCatalogName(catalogName)
+        .build();
+  }
+
+  private HiveTable createTestTable(
+      String catalogName, String databaseName, String tableName, String 
location) {
+    Column idColumn = Column.of("id", Types.IntegerType.get(), null, false, 
false, null);
+    Column dtColumn = Column.of("dt", Types.StringType.get());
+    Map<String, String> properties = new HashMap<>();
+    properties.put(HiveConstants.LOCATION, location);
+    return HiveTable.builder()
+        .withName(tableName)
+        .withColumns(new Column[] {idColumn, dtColumn})
+        .withComment("Test table for HiveClient operations")
+        .withProperties(properties)
+        .withAuditInfo(defaultAudit())
+        .withPartitioning(new Transform[] {Transforms.identity("dt")})
+        .withCatalogName(catalogName)
+        .withDatabaseName(databaseName)
+        .build();
+  }
+
+  private HivePartition createTestPartition(String partitionName, String 
value) {
+    HivePartition partition =
+        HivePartition.identity(
+            new String[][] {new String[] {"dt"}},
+            new Literal<?>[] {Literals.stringLiteral(value)},
+            Map.of());
+    Assertions.assertEquals(partitionName, partition.name());
+    return partition;
+  }
+
+  private AuditInfo defaultAudit() {
+    return AuditInfo.builder()
+        .withCreator(System.getProperty("user.name", "gravitino"))
+        .withCreateTime(Instant.now())
+        .build();
+  }
+
+  private void safelyDropTable(
+      HiveClient client, String catalogName, String dbName, String tableName) {
+    try {
+      client.dropTable(catalogName, dbName, tableName, true, true);
+    } catch (Exception ignored) {
+      // ignore cleanup failures
+    }
+  }
+
+  private void safelyDropDatabase(HiveClient client, String catalogName, 
String dbName) {
+    try {
+      client.dropDatabase(catalogName, dbName, true);
+    } catch (Exception ignored) {
+      // ignore cleanup failures
+    }
+  }
+
+  @Test
+  void testHiveExceptionHandling() throws Exception {
+    testHiveExceptionHandlingForVersion("", HIVE2_HMS_URL, HIVE2_HDFS_URL);
+  }
+
+  @Test
+  void testHive3ExceptionHandling() throws Exception {
+    testHiveExceptionHandlingForVersion("hive", HIVE3_HMS_URL, HIVE3_HDFS_URL);
+  }
+
+  private void testHiveExceptionHandlingForVersion(
+      String catalogName, String metastoreUri, String hdfsBasePath) throws 
Exception {
+    Properties properties = new Properties();
+    properties.setProperty("hive.metastore.uris", metastoreUri);
+    HiveClient client = new HiveClientFactory(properties, 
"").createHiveClient();
+
+    String dbName = "gt_exception_test_db_" + 
UUID.randomUUID().toString().replace("-", "");
+    String tableName = "gt_exception_test_tbl_" + 
UUID.randomUUID().toString().replace("-", "");
+    String partitionValue = "p_" + UUID.randomUUID().toString().replace("-", 
"");
+    String partitionName = "dt=" + partitionValue;
+
+    String dbLocation = hdfsBasePath + "/" + dbName;
+    String tableLocation = hdfsBasePath + "/" + tableName;
+
+    HiveSchema schema = createTestSchema(catalogName, dbName, dbLocation);
+    HiveTable table = createTestTable(catalogName, dbName, tableName, 
tableLocation);
+    HivePartition partition = createTestPartition(partitionName, 
partitionValue);
+
+    try {
+      // Test SchemaAlreadyExistsException - create database twice
+      try {
+        client.createDatabase(schema);
+      } catch (GravitinoRuntimeException e) {
+        // If permission error occurs, skip this test
+        if (e.getCause() != null
+            && e.getCause().getMessage() != null
+            && e.getCause().getMessage().contains("Permission denied")) {
+          return; // Skip test if permission denied
+        }
+        throw e;
+      }
+      Assertions.assertThrows(
+          SchemaAlreadyExistsException.class, () -> 
client.createDatabase(schema));
+
+      // Test NoSuchSchemaException - get non-existent database
+      Assertions.assertThrows(
+          NoSuchSchemaException.class,
+          () -> client.getDatabase(catalogName, "non_existent_db_" + 
UUID.randomUUID()));
+
+      // Test TableAlreadyExistsException - create table twice
+      client.createTable(table);
+      Assertions.assertThrows(TableAlreadyExistsException.class, () -> 
client.createTable(table));
+
+      // Test NoSuchTableException - get non-existent table
+      Assertions.assertThrows(
+          NoSuchTableException.class,
+          () -> client.getTable(catalogName, dbName, "non_existent_table_" + 
UUID.randomUUID()));
+
+      // Test PartitionAlreadyExistsException - add partition twice
+      HiveTable loadedTable = client.getTable(catalogName, dbName, tableName);
+      HivePartition addedPartition = client.addPartition(loadedTable, 
partition);
+      Assertions.assertNotNull(addedPartition, "Added partition should not be 
null");
+      Assertions.assertThrows(
+          PartitionAlreadyExistsException.class, () -> 
client.addPartition(loadedTable, partition));
+
+      // Test NoSuchPartitionException - get non-existent partition
+      Assertions.assertThrows(
+          NoSuchPartitionException.class,
+          () -> client.getPartition(loadedTable, "dt=non_existent_partition_" 
+ UUID.randomUUID()));
+
+      // Test NonEmptySchemaException - try to drop database with tables 
(cascade=false)
+      Exception exception =
+          Assertions.assertThrows(
+              Exception.class, () -> client.dropDatabase(catalogName, dbName, 
false));
+      // Hive may throw different exceptions for non-empty database
+      // The converter should handle it appropriately
+      Assertions.assertTrue(
+          exception instanceof NonEmptySchemaException
+              || exception instanceof GravitinoRuntimeException,
+          "Should throw NonEmptySchemaException or GravitinoRuntimeException, 
got: "
+              + exception.getClass().getName());
+
+      // Cleanup
+      client.dropPartition(catalogName, dbName, tableName, 
addedPartition.name(), true);
+      client.dropTable(catalogName, dbName, tableName, true, true);
+      client.dropDatabase(catalogName, dbName, true);
+    } finally {
+      safelyDropTable(client, catalogName, dbName, tableName);
+      safelyDropDatabase(client, catalogName, dbName);
+    }
+  }
+
+  private void testConnectionFailedExceptionForVersion(String catalogName) {
+    // Test with invalid/unreachable Hive Metastore URI
+    String invalidMetastoreUri = "thrift://127.0.0.1:9999";
+    Properties properties = new Properties();
+    properties.setProperty("hive.metastore.uris", invalidMetastoreUri);
+
+    // Connection failure may occur during client creation or operation
+    // Both should be converted to ConnectionFailedException
+    Exception exception =
+        Assertions.assertThrows(
+            Exception.class,
+            () -> {
+              HiveClient client = new HiveClientFactory(properties, 
"").createHiveClient();
+              client.getAllDatabases(catalogName);
+            });
+
+    // Verify the exception is converted to ConnectionFailedException
+    Assertions.assertTrue(
+        exception instanceof ConnectionFailedException,
+        "Should throw ConnectionFailedException, got: " + 
exception.getClass().getName());
+    Assertions.assertNotNull(
+        ((ConnectionFailedException) exception).getCause(), "Exception should 
have a cause");
+  }
+
+  @Test
+  void testConnectionFailedException() throws Exception {
+    // Test with HIVE2
+    testConnectionFailedExceptionForVersion("");
+
+    // Test with HIVE3
+    testConnectionFailedExceptionForVersion("hive");
+  }
+
+  @Test
+  void testKerberosConnection() {
+    // This method can be implemented to test Kerberos authentication with 
Hive Metastore
+    // when a Kerberos-enabled environment is available.
+    Properties properties = new Properties();
+    properties.setProperty("hive.metastore.uris", KERBEROS_HIVE2_HMS_URL);
+    properties.setProperty("authentication.kerberos.principal", 
KERBEROS_PRINCIPAL);
+    properties.setProperty("authentication.impersonation-enable", "true");
+    properties.setProperty("authentication.kerberos.keytab-uri", 
KERBEROS_KEYTAB);
+    properties.setProperty("hive.metastore.kerberos.principal", 
KERBEROS_METASTORE_PRINCIPAL);
+    properties.setProperty("hive.metastore.sasl.enabled", "true");
+    properties.setProperty("hadoop.security.authentication", "kerberos");
+
+    System.setProperty("java.security.krb5.conf", KERBEROS_KRB5_CONF);
+
+    String catalogName = "hive";
+    String dbName = "test_kerberos_db";
+    String dbLocation = KERBEROS_HIVE2_HDFS_URL + 
"/tmp/gravitino_kerberos_test/" + dbName;
+
+    HiveClient client = new HiveClientFactory(properties, 
"00").createHiveClient();
+    HiveSchema schema = createTestSchema(catalogName, dbName, dbLocation);
+    client.createDatabase(schema);
+    List<String> allDatabases = client.getAllDatabases(catalogName);
+    Assertions.assertTrue(allDatabases.contains(dbName), "Database should be 
in the list");
+    client.dropDatabase(catalogName, dbName, true);
+  }
+}
diff --git a/catalogs/hive-metastore2-libs/build.gradle.kts 
b/catalogs/hive-metastore2-libs/build.gradle.kts
new file mode 100644
index 0000000000..f2629d30ca
--- /dev/null
+++ b/catalogs/hive-metastore2-libs/build.gradle.kts
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.gradle.api.publish.maven.tasks.PublishToMavenLocal
+import org.gradle.api.publish.maven.tasks.PublishToMavenRepository
+
+plugins {
+  id("java")
+  id("idea")
+}
+
+dependencies {
+  implementation(libs.hive2.metastore)
+  implementation(libs.hadoop2.common)
+}
+
+tasks {
+  val copyDepends by registering(Copy::class) {
+    from(configurations.runtimeClasspath)
+    into("build/libs")
+  }
+  jar {
+    finalizedBy(copyDepends)
+  }
+
+  register("copyLibs", Copy::class) {
+    dependsOn(copyDepends, "build")
+    from("build/libs")
+    
into("$rootDir/distribution/package/catalogs/hive/libs/hive-metastore2-libs")
+  }
+}
+
+tasks.withType<PublishToMavenLocal>().configureEach { enabled = false }
+tasks.withType<PublishToMavenRepository>().configureEach { enabled = false }
diff --git a/catalogs/hive-metastore3-libs/build.gradle.kts 
b/catalogs/hive-metastore3-libs/build.gradle.kts
new file mode 100644
index 0000000000..918d3ebd9a
--- /dev/null
+++ b/catalogs/hive-metastore3-libs/build.gradle.kts
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.gradle.api.publish.maven.tasks.PublishToMavenLocal
+import org.gradle.api.publish.maven.tasks.PublishToMavenRepository
+
+plugins {
+  id("java")
+  id("idea")
+}
+
+dependencies {
+  implementation(libs.hive3.metastore)
+  implementation(libs.hadoop2.common)
+}
+
+tasks {
+  val copyDepends by registering(Copy::class) {
+    from(configurations.runtimeClasspath)
+    into("build/libs")
+  }
+  jar {
+    finalizedBy(copyDepends)
+  }
+
+  register("copyLibs", Copy::class) {
+    dependsOn(copyDepends, "build")
+    from("build/libs")
+    
into("$rootDir/distribution/package/catalogs/hive/libs/hive-metastore3-libs")
+  }
+}
+
+tasks.withType<PublishToMavenLocal>().configureEach { enabled = false }
+tasks.withType<PublishToMavenRepository>().configureEach { enabled = false }
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 3c4f2cf403..0ac9055b79 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -36,6 +36,7 @@ mockito = "4.11.0"
 airlift-json = "237"
 airlift-resolver = "1.6"
 hive2 = "2.3.9"
+hive3 = "3.1.3"
 hadoop2 = "2.10.2"
 hadoop3 = "3.3.1"
 hadoop3-gcs = "1.9.4-hadoop3"
@@ -187,6 +188,8 @@ hive2-metastore = { group = "org.apache.hive", name = 
"hive-metastore", version.
 hive2-exec = { group = "org.apache.hive", name = "hive-exec", version.ref = 
"hive2"}
 hive2-common = { group = "org.apache.hive", name = "hive-common", version.ref 
= "hive2"}
 hive2-jdbc = { group = "org.apache.hive", name = "hive-jdbc", version.ref = 
"hive2"}
+hive3-metastore = { group = "org.apache.hive", name = "hive-metastore", 
version.ref = "hive3"}
+hive3-common = { group = "org.apache.hive", name = "hive-common", version.ref 
= "hive3"}
 hadoop2-auth = { group = "org.apache.hadoop", name = "hadoop-auth", 
version.ref = "hadoop2" }
 hadoop2-hdfs = { group = "org.apache.hadoop", name = "hadoop-hdfs", 
version.ref = "hadoop2" }
 hadoop2-hdfs-client = { group = "org.apache.hadoop", name = 
"hadoop-hdfs-client", version.ref = "hadoop2" }
diff --git a/settings.gradle.kts b/settings.gradle.kts
index cde9547aad..d3f622dbab 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -29,6 +29,7 @@ include("api", "common", "core", "server", "server-common")
 include("catalogs:catalog-common")
 include("catalogs:catalog-hive")
 include("catalogs:hive-metastore-common")
+include("catalogs:hive-metastore2-libs", "catalogs:hive-metastore3-libs")
 include("catalogs:catalog-lakehouse-iceberg")
 include("catalogs:catalog-lakehouse-paimon")
 include("catalogs:catalog-lakehouse-hudi")

Reply via email to