Copilot commented on code in PR #9416:
URL: https://github.com/apache/gravitino/pull/9416#discussion_r2601544435


##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientFactory.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.client;
+
+import static 
org.apache.gravitino.hive.client.HiveClientClassLoader.HiveVersion.HIVE2;
+import static 
org.apache.gravitino.hive.client.HiveClientClassLoader.HiveVersion.HIVE3;
+import static 
org.apache.gravitino.hive.client.Util.buildConfigurationFromProperties;
+
+import com.google.common.base.Preconditions;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.util.Properties;
+import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class HiveClientFactory {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveClientFactory.class);
+
+  // Remember which Hive backend version worked successfully for this factory.
+  private volatile HiveClientClassLoader.HiveVersion backendVersion;
+  private volatile HiveClientClassLoader backendClassLoader;
+
+  @SuppressWarnings("UnusedVariable")
+  private final Configuration hadoopConf;
+
+  private final Properties properties;
+
+  /**
+   * Creates a {@link HiveClientFactory} bound to the given configuration 
properties.
+   *
+   * @param properties Hive client configuration, must not be null.
+   * @param id An identifier for this factory instance.
+   */
+  public HiveClientFactory(Properties properties, String id) {
+    Preconditions.checkArgument(properties != null, "Properties cannot be 
null");
+    this.properties = properties;
+
+    try {
+      this.hadoopConf = buildConfigurationFromProperties(properties);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to initialize HiveClientFactory", e);
+    }
+  }
+
+  /** Creates a {@link HiveClient} using the properties associated with this 
factory. */
+  public HiveClient createHiveClient() {
+    HiveClient client = null;
+    try {
+      if (backendVersion != null) {
+        HiveClientClassLoader classLoader = 
getOrCreateClassLoader(backendVersion);
+        client = createHiveClientInternal(classLoader);
+        LOG.info("Connected to Hive Metastore using cached Hive version {}", 
backendVersion.name());
+        return client;
+      }
+    } catch (Exception e) {
+      LOG.warn(
+          "Failed to connect to Hive Metastore using cached Hive version {}", 
backendVersion, e);
+      throw new RuntimeException("Failed to connect to Hive Metastore", e);
+    }
+
+    try {
+      // Try using Hive3 first
+      HiveClientClassLoader classloader = getOrCreateClassLoader(HIVE3);
+      client = createHiveClientInternal(classloader);
+      client.getCatalogs();
+      LOG.info("Connected to Hive Metastore using Hive version HIVE3");
+      backendClassLoader = classloader;
+      backendVersion = HiveClientClassLoader.HiveVersion.HIVE3;
+      return client;
+
+    } catch (GravitinoRuntimeException e) {
+      if (client != null) {
+        client.close();
+      }
+
+      try {
+        // Fallback to Hive2 if we can list databases
+        if (e.getMessage().contains("Invalid method name: 'get_catalogs'")
+            || e.getMessage().contains("class not found") // caused by 
MiniHiveMetastoreService
+        ) {

Review Comment:
   Using string matching on exception messages (e.g., 
`e.getMessage().contains("Invalid method name: 'get_catalogs'")`) is fragile 
and error-prone as exception messages can vary across versions or be localized. 
Consider checking exception types or using more robust detection mechanisms to 
determine if fallback to Hive2 is needed.



##########
catalogs/hive-metastore-common/src/test/java/org/apache/gravitino/hive/client/TestHiveClient.java:
##########
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.client;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import org.apache.gravitino.catalog.hive.HiveConstants;
+import org.apache.gravitino.exceptions.ConnectionFailedException;
+import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.apache.gravitino.exceptions.NoSuchPartitionException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.NonEmptySchemaException;
+import org.apache.gravitino.exceptions.PartitionAlreadyExistsException;
+import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.hive.HivePartition;
+import org.apache.gravitino.hive.HiveSchema;
+import org.apache.gravitino.hive.HiveTable;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+// This class is use for manual testing against real Hive Metastore instances.
+@Disabled
+public class TestHiveClient {
+
+  private static final String HIVE2_HMS_URL = "thrift://172.17.0.4:9083";
+  private static final String HIVE2_HDFS_URL = "hdfs://172.17.0.4:9000";
+  private static final String HIVE3_HMS_URL = "thrift://172.17.0.3:9083";
+  private static final String HIVE3_HDFS_URL = "hdfs://172.17.0.3:9000";
+
+  private static final String KERBEROS_HIVE2_HMS_URL = 
"thrift://172.17.0.2:9083";
+  private static final String KERBEROS_HIVE2_HDFS_URL = 
"hdfs://172.17.0.2:9000";
+

Review Comment:
   Hardcoded IP addresses and port numbers in test constants make the tests 
environment-specific and non-portable. Consider using configurable properties, 
environment variables, or test containers to make these tests more maintainable 
and executable in different environments.
   ```suggestion
     private static final String HIVE2_HMS_URL =
         System.getenv().getOrDefault("HIVE2_HMS_URL", 
"thrift://172.17.0.4:9083");
     private static final String HIVE2_HDFS_URL =
         System.getenv().getOrDefault("HIVE2_HDFS_URL", 
"hdfs://172.17.0.4:9000");
     private static final String HIVE3_HMS_URL =
         System.getenv().getOrDefault("HIVE3_HMS_URL", 
"thrift://172.17.0.3:9083");
     private static final String HIVE3_HDFS_URL =
         System.getenv().getOrDefault("HIVE3_HDFS_URL", 
"hdfs://172.17.0.3:9000");
     private static final String KERBEROS_HIVE2_HMS_URL =
         System.getenv().getOrDefault("KERBEROS_HIVE2_HMS_URL", 
"thrift://172.17.0.2:9083");
     private static final String KERBEROS_HIVE2_HDFS_URL =
         System.getenv().getOrDefault("KERBEROS_HIVE2_HDFS_URL", 
"hdfs://172.17.0.2:9000");
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientFactory.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.client;
+
+import static 
org.apache.gravitino.hive.client.HiveClientClassLoader.HiveVersion.HIVE2;
+import static 
org.apache.gravitino.hive.client.HiveClientClassLoader.HiveVersion.HIVE3;
+import static 
org.apache.gravitino.hive.client.Util.buildConfigurationFromProperties;
+
+import com.google.common.base.Preconditions;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.util.Properties;
+import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class HiveClientFactory {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveClientFactory.class);
+
+  // Remember which Hive backend version worked successfully for this factory.
+  private volatile HiveClientClassLoader.HiveVersion backendVersion;
+  private volatile HiveClientClassLoader backendClassLoader;
+
+  @SuppressWarnings("UnusedVariable")
+  private final Configuration hadoopConf;
+
+  private final Properties properties;
+
+  /**
+   * Creates a {@link HiveClientFactory} bound to the given configuration 
properties.
+   *
+   * @param properties Hive client configuration, must not be null.
+   * @param id An identifier for this factory instance.
+   */
+  public HiveClientFactory(Properties properties, String id) {
+    Preconditions.checkArgument(properties != null, "Properties cannot be 
null");
+    this.properties = properties;
+
+    try {
+      this.hadoopConf = buildConfigurationFromProperties(properties);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to initialize HiveClientFactory", e);
+    }
+  }
+
+  /** Creates a {@link HiveClient} using the properties associated with this 
factory. */
+  public HiveClient createHiveClient() {
+    HiveClient client = null;
+    try {
+      if (backendVersion != null) {
+        HiveClientClassLoader classLoader = 
getOrCreateClassLoader(backendVersion);
+        client = createHiveClientInternal(classLoader);
+        LOG.info("Connected to Hive Metastore using cached Hive version {}", 
backendVersion.name());
+        return client;
+      }
+    } catch (Exception e) {
+      LOG.warn(
+          "Failed to connect to Hive Metastore using cached Hive version {}", 
backendVersion, e);
+      throw new RuntimeException("Failed to connect to Hive Metastore", e);
+    }
+
+    try {
+      // Try using Hive3 first
+      HiveClientClassLoader classloader = getOrCreateClassLoader(HIVE3);
+      client = createHiveClientInternal(classloader);
+      client.getCatalogs();
+      LOG.info("Connected to Hive Metastore using Hive version HIVE3");
+      backendClassLoader = classloader;
+      backendVersion = HiveClientClassLoader.HiveVersion.HIVE3;
+      return client;
+
+    } catch (GravitinoRuntimeException e) {
+      if (client != null) {
+        client.close();
+      }
+
+      try {
+        // Fallback to Hive2 if we can list databases
+        if (e.getMessage().contains("Invalid method name: 'get_catalogs'")
+            || e.getMessage().contains("class not found") // caused by 
MiniHiveMetastoreService
+        ) {
+          HiveClientClassLoader classloader = getOrCreateClassLoader(HIVE2);
+          client = createHiveClientInternal(classloader);
+          LOG.info("Connected to Hive Metastore using Hive version HIVE2");
+          backendClassLoader = classloader;
+          backendVersion = HIVE2;
+          return client;

Review Comment:
   If the client creation succeeds but `client.getCatalogs()` fails (line 86), 
the client is closed (line 94). However, the client variable is then reused in 
the HIVE2 fallback path (line 103), which could cause issues. Additionally, if 
the HIVE2 fallback also fails, the originally closed HIVE3 client reference is 
lost. Consider using separate variable names for HIVE3 and HIVE2 clients or 
ensuring proper cleanup in all failure paths.



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveExceptionConverter.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hive.client;
+
+/**
+ * Utility class to convert Hive exceptions to Gravitino exceptions. This 
class handles various
+ * types of exceptions that can be thrown by Hive Metastore operations, 
including:
+ *
+ * <ul>
+ *   <li>Reflection exceptions (InvocationTargetException)
+ *   <Li>Hive Metastore exceptions (e.g., AlreadyExistsException, 
NoSuchObjectException,
+ *       InvalidOperationException, MetaException)
+ *   <li>Hive Thrift exceptions (TException)
+ *   <li>Other runtime exceptions
+ * </ul>
+ */
+public class HiveExceptionConverter {
+
+  enum TargetType {
+    TABLE,
+    SCHEMA,
+    PARTITION,
+    CATALOG,
+    OTHER
+  }
+
+  /** Represents the target Hive object (name + type) associated with an 
operation. */
+  public static final class ExceptionTarget {
+    private final String name;
+    private final TargetType type;
+
+    private ExceptionTarget(String name, TargetType type) {
+      this.name = name;
+      this.type = type;
+    }
+
+    public static ExceptionTarget table(String name) {
+      return new ExceptionTarget(name, TargetType.TABLE);
+    }
+
+    public static ExceptionTarget schema(String name) {
+      return new ExceptionTarget(name, TargetType.SCHEMA);
+    }
+
+    public static ExceptionTarget catalog(String name) {
+      return new ExceptionTarget(name, TargetType.CATALOG);
+    }
+
+    public static ExceptionTarget partition(String name) {
+      return new ExceptionTarget(name, TargetType.PARTITION);
+    }
+
+    public static ExceptionTarget other(String name) {
+      return new ExceptionTarget(name, TargetType.OTHER);
+    }
+
+    public String name() {
+      return name;
+    }
+
+    public TargetType type() {
+      return type;
+    }
+  }
+
+  private HiveExceptionConverter() {}
+
+  /**
+   * Converts a generic exception to a Gravitino exception with a target Hive 
object.
+   *
+   * @param e The exception to convert
+   * @param target The Hive object related to the operation (table, partition, 
schema, etc.)
+   * @return A Gravitino exception
+   */
+  public static RuntimeException toGravitinoException(Exception e, 
ExceptionTarget target) {
+    return null;
+  }

Review Comment:
   The `toGravitinoException()` method returns `null`, which will cause 
NullPointerExceptions when exception conversion is attempted. This method is 
called throughout the codebase (e.g., in `HiveClientFactory` lines 116, 170 and 
`HiveShim` line 60). It must properly convert Hive exceptions to Gravitino 
exceptions according to the documentation.



##########
catalogs/hive-metastore-common/src/test/java/org/apache/gravitino/hive/client/TestHiveClient.java:
##########
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.client;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import org.apache.gravitino.catalog.hive.HiveConstants;
+import org.apache.gravitino.exceptions.ConnectionFailedException;
+import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.apache.gravitino.exceptions.NoSuchPartitionException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.NonEmptySchemaException;
+import org.apache.gravitino.exceptions.PartitionAlreadyExistsException;
+import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.hive.HivePartition;
+import org.apache.gravitino.hive.HiveSchema;
+import org.apache.gravitino.hive.HiveTable;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+// This class is use for manual testing against real Hive Metastore instances.

Review Comment:
   Grammar error: "is use" should be "is used".
   ```suggestion
   // This class is used for manual testing against real Hive Metastore 
instances.
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/ProxyHiveClientImpl.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hive.client;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+
+public class ProxyHiveClientImpl implements InvocationHandler {
+  @Override
+  public Object invoke(Object o, Method method, Object[] objects) throws 
Throwable {
+    return null;

Review Comment:
   The `ProxyHiveClientImpl.invoke()` method always returns `null`, which will 
cause NullPointerExceptions when any method is invoked through this proxy. This 
class is referenced in `HiveClientFactory.createProxyHiveClientImpl()` and 
needs a proper implementation to delegate calls to the underlying Hive client 
with appropriate user context.
   ```suggestion
     private final Object hiveClient;
   
     public ProxyHiveClientImpl(Object hiveClient) {
       this.hiveClient = hiveClient;
     }
   
     @Override
     public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
       // If user context/impersonation is needed, add logic here.
       return method.invoke(hiveClient, args);
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientFactory.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.client;
+
+import static 
org.apache.gravitino.hive.client.HiveClientClassLoader.HiveVersion.HIVE2;
+import static 
org.apache.gravitino.hive.client.HiveClientClassLoader.HiveVersion.HIVE3;
+import static 
org.apache.gravitino.hive.client.Util.buildConfigurationFromProperties;
+
+import com.google.common.base.Preconditions;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.util.Properties;
+import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class HiveClientFactory {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveClientFactory.class);
+
+  // Remember which Hive backend version worked successfully for this factory.
+  private volatile HiveClientClassLoader.HiveVersion backendVersion;
+  private volatile HiveClientClassLoader backendClassLoader;
+
+  @SuppressWarnings("UnusedVariable")
+  private final Configuration hadoopConf;
+
+  private final Properties properties;
+
+  /**
+   * Creates a {@link HiveClientFactory} bound to the given configuration 
properties.
+   *
+   * @param properties Hive client configuration, must not be null.
+   * @param id An identifier for this factory instance.
+   */
+  public HiveClientFactory(Properties properties, String id) {

Review Comment:
   The `id` parameter in the constructor is documented but never used in the 
implementation. Either use the parameter (e.g., for logging or tracking), 
remove it, or clarify in the documentation that it's reserved for future use.
   ```suggestion
      */
     public HiveClientFactory(Properties properties) {
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/Util.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hive.client;
+
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+public class Util {
+
+  public static final String HIVE_CONFIG_RESOURCES = "hive.config.resources";
+
+  public static Method findMethod(Class<?> klass, String name, Class<?>... 
args)
+      throws NoSuchMethodException {
+    return klass.getMethod(name, args);
+  }
+
+  protected static Method findStaticMethod(Class<?> klass, String name, 
Class<?>... args)
+      throws NoSuchMethodException {
+    Method method = findMethod(klass, name, args);
+
+    if (!Modifier.isStatic(method.getModifiers())) {
+      throw new IllegalArgumentException(
+          "Method " + name + " of class " + klass.getName() + " is not 
static.");
+    }
+    return method;
+  }
+
+  public static Configuration buildConfigurationFromProperties(Properties 
properties) {
+    try {
+      Configuration config = new Configuration();
+      String hdfsConfigResources = 
properties.getProperty(HIVE_CONFIG_RESOURCES);
+      if (StringUtils.isNotBlank(hdfsConfigResources)) {
+        for (String resource : hdfsConfigResources.split(",")) {

Review Comment:
   Variable name `hdfsConfigResources` is misleading as it's retrieving a 
property named `HIVE_CONFIG_RESOURCES` which could include various Hive/Hadoop 
configuration files, not just HDFS-specific resources. Consider renaming to 
`hiveConfigResources` or `configResources` to better reflect its purpose.
   ```suggestion
         String hiveConfigResources = 
properties.getProperty(HIVE_CONFIG_RESOURCES);
         if (StringUtils.isNotBlank(hiveConfigResources)) {
           for (String resource : hiveConfigResources.split(",")) {
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClient.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hive.client;
+
+import java.util.List;
+import org.apache.gravitino.hive.HivePartition;
+import org.apache.gravitino.hive.HiveSchema;
+import org.apache.gravitino.hive.HiveTable;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * An externally visible interface to the Hive client. This interface is 
shared across both the
+ * internal and external classloaders for a given version of Hive and thus 
must expose only shared
+ * classes.
+ */
+public interface HiveClient extends AutoCloseable {
+
+  void createDatabase(HiveSchema database);
+
+  HiveSchema getDatabase(String catalogName, String databaseName);
+
+  List<String> getAllDatabases(String catalogName);
+
+  void alterDatabase(String catalogName, String databaseName, HiveSchema 
database);
+
+  void dropDatabase(String catalogName, String databaseName, boolean cascade);
+
+  List<String> getAllTables(String catalogName, String databaseName);
+
+  List<String> listTableNamesByFilter(
+      String catalogName, String databaseName, String filter, short pageSize);
+
+  HiveTable getTable(String catalogName, String databaseName, String 
tableName);
+
+  void alterTable(
+      String catalogName, String databaseName, String tableName, HiveTable 
alteredHiveTable);
+
+  void dropTable(
+      String catalogName,
+      String databaseName,
+      String tableName,
+      boolean deleteData,
+      boolean ifPurge);
+
+  void createTable(HiveTable hiveTable);
+
+  List<String> listPartitionNames(HiveTable table, short pageSize);
+
+  List<HivePartition> listPartitions(HiveTable table, short pageSize);
+
+  List<HivePartition> listPartitions(
+      HiveTable table, List<String> filterPartitionValueList, short pageSize);
+
+  HivePartition getPartition(HiveTable table, String partitionName);
+
+  HivePartition addPartition(HiveTable table, HivePartition partition);
+
+  void dropPartition(
+      String catalogName, String databaseName, String tableName, String 
partitionName, boolean b);

Review Comment:
   The parameter name `b` is non-descriptive. It should be renamed to something 
meaningful like `deleteData` or `purge` to indicate its purpose. Looking at 
similar methods (e.g., line 54-59 `dropTable`), this parameter appears to 
control purge behavior.
   ```suggestion
         String catalogName, String databaseName, String tableName, String 
partitionName, boolean deleteData);
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientImpl.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hive.client;
+
+import java.util.List;
+import org.apache.gravitino.hive.HivePartition;
+import org.apache.gravitino.hive.HiveSchema;
+import org.apache.gravitino.hive.HiveTable;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Java version of HiveClientImpl from Spark Hive module. Provides full 
database, table, and
+ * partition operations.
+ */
+public class HiveClientImpl implements HiveClient {
+  @Override
+  public void createDatabase(HiveSchema database) {}
+
+  @Override
+  public HiveSchema getDatabase(String catalogName, String databaseName) {
+    return null;
+  }
+
+  @Override
+  public List<String> getAllDatabases(String catalogName) {
+    return List.of();
+  }
+
+  @Override
+  public void alterDatabase(String catalogName, String databaseName, 
HiveSchema database) {}
+
+  @Override
+  public void dropDatabase(String catalogName, String databaseName, boolean 
cascade) {}
+
+  @Override
+  public List<String> getAllTables(String catalogName, String databaseName) {
+    return List.of();
+  }
+
+  @Override
+  public List<String> listTableNamesByFilter(
+      String catalogName, String databaseName, String filter, short pageSize) {
+    return List.of();
+  }
+
+  @Override
+  public HiveTable getTable(String catalogName, String databaseName, String 
tableName) {
+    return null;
+  }
+
+  @Override
+  public void alterTable(
+      String catalogName, String databaseName, String tableName, HiveTable 
alteredHiveTable) {}
+
+  @Override
+  public void dropTable(
+      String catalogName,
+      String databaseName,
+      String tableName,
+      boolean deleteData,
+      boolean ifPurge) {}
+
+  @Override
+  public void createTable(HiveTable hiveTable) {}
+
+  @Override
+  public List<String> listPartitionNames(HiveTable table, short pageSize) {
+    return List.of();
+  }
+
+  @Override
+  public List<HivePartition> listPartitions(HiveTable table, short pageSize) {
+    return List.of();
+  }
+
+  @Override
+  public List<HivePartition> listPartitions(
+      HiveTable table, List<String> filterPartitionValueList, short pageSize) {
+    return List.of();
+  }
+
+  @Override
+  public HivePartition getPartition(HiveTable table, String partitionName) {
+    return null;
+  }
+
+  @Override
+  public HivePartition addPartition(HiveTable table, HivePartition partition) {
+    return null;
+  }
+
+  @Override
+  public void dropPartition(
+      String catalogName, String databaseName, String tableName, String 
partitionName, boolean b) {}
+
+  @Override
+  public String getDelegationToken(String finalPrincipalName, String userName) 
{
+    return "";
+  }
+
+  @Override
+  public List<HiveTable> getTableObjectsByName(
+      String catalogName, String databaseName, List<String> allTables) {
+    return List.of();
+  }
+
+  @Override
+  public List<String> getCatalogs() {
+    return List.of();
+  }
+
+  @Override
+  public void close() {}
+
+  @Override
+  public UserGroupInformation getUser() {
+    return null;
+  }
+}

Review Comment:
   The `HiveClientImpl` class contains only stub implementations that return 
`null` or empty collections. This is a non-functional implementation that will 
cause NullPointerExceptions or incorrect behavior when used. All methods need 
proper implementations that interact with the Hive Metastore via the shim 
layer. If this is intentional as a work-in-progress, it should be clearly 
documented as such.



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientClassLoader.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gravitino.hive.client;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Isolated client loader for Hive Metastore clients. This class creates an 
isolated classloader
+ * that loads Hive-specific classes from version-specific jar files while 
sharing common classes
+ * with the base classloader.
+ */
+public final class HiveClientClassLoader extends URLClassLoader {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveClientClassLoader.class);
+
+  public enum HiveVersion {
+    HIVE2,
+    HIVE3,
+  }
+
+  private final ClassLoader baseClassLoader;
+  private final HiveVersion version;
+
+  /**
+   * Constructs an HiveClientClassLoader.
+   *
+   * @param version The Hive version
+   * @param execJars List of jar file URLs to load
+   * @param baseClassLoader The base classloader for shared classes
+   */
+  private HiveClientClassLoader(
+      HiveVersion version, List<URL> execJars, ClassLoader baseClassLoader) {
+    super(version.toString(), execJars.toArray(new URL[0]), null);
+    Preconditions.checkArgument(version != null, "Hive version cannot be 
null");
+    Preconditions.checkArgument(
+        execJars != null && !execJars.isEmpty(), "Jar URLs cannot be null or 
empty");
+    Preconditions.checkArgument(baseClassLoader != null, "Base classloader 
cannot be null");
+
+    this.version = version;
+    this.baseClassLoader = baseClassLoader;
+  }
+
+  public HiveVersion getHiveVersion() {
+    return version;
+  }
+
+  /**
+   * Creates a new {@link HiveClientClassLoader} instance for the given 
version.
+   *
+   * <p>This method does not perform any caching. Callers are responsible for 
managing and
+   * optionally caching returned instances.
+   *
+   * @param hiveVersion The Hive version to create a loader for.
+   * @param baseLoader The parent classloader to delegate shared classes to.
+   * @return A new {@link HiveClientClassLoader} instance.
+   */
+  public static HiveClientClassLoader createLoader(HiveVersion hiveVersion, 
ClassLoader baseLoader)
+      throws IOException {
+    Path jarDir = getJarDirectory(hiveVersion);
+    if (!Files.exists(jarDir) || !Files.isDirectory(jarDir)) {
+      throw new IOException("Hive jar directory does not exist or is not a 
directory: " + jarDir);
+    }
+
+    List<URL> jars = loadJarUrls(jarDir);
+    if (jars.isEmpty()) {
+      throw new IOException("No jar files found in directory: " + jarDir);
+    }
+
+    return new HiveClientClassLoader(hiveVersion, jars, baseLoader);
+  }
+
+  /**
+   * Gets the jar directory path for the specified Hive version.
+   *
+   * @param version The Hive version
+   * @return The path to the jar directory
+   */
+  private static Path getJarDirectory(HiveVersion version) {
+    String gravitinoHome = System.getenv("GRAVITINO_HOME");
+    if (StringUtils.isEmpty(gravitinoHome)) {
+      Path p = Paths.get(System.getProperty("user.dir"));
+      while (p != null) {
+        if 
(Files.exists(p.resolve("catalogs").resolve("hive-metastore-common"))) {
+          gravitinoHome = p.toString();
+          break;
+        }
+        p = p.getParent();
+      }
+    }
+
+    if (StringUtils.isEmpty(gravitinoHome)) {
+      throw new GravitinoRuntimeException(
+          "GRAVITINO_HOME environment variable is not set and cannot determine 
project root directory");
+    }
+
+    String libsDir = version == HiveVersion.HIVE2 ? "hive-metastore2-libs" : 
"hive-metastore3-libs";
+
+    // try to get path from GRAVITINO_HOME in deployment mode
+    String jarPath = Paths.get(gravitinoHome, "catalogs", "hive", "libs", 
libsDir).toString();
+    if (Files.exists(Paths.get(jarPath))) {
+      return Paths.get(jarPath).toAbsolutePath();
+    }
+    LOG.info("Can not find Hive jar directory for version {} in directory : 
{}", version, jarPath);
+
+    // Try to get project root directory from project root in development mode
+    jarPath = Paths.get(gravitinoHome, "catalogs", libsDir, "build", 
"libs").toString();
+    if (!Files.exists(Paths.get(jarPath))) {
+      throw new GravitinoRuntimeException(
+          "Cannot find Hive jar directory for version %s in directory 
<PROJECT_HOME>/catalogs/%s/build/libs "
+              + "or $GRAVITINO_HOME/catalogs/hive/libs/%s",
+          version, libsDir, libsDir);
+    }
+    return Paths.get(jarPath).toAbsolutePath();
+  }
+
+  /**
+   * Loads all jar file URLs from the specified directory.
+   *
+   * @param jarDir The directory containing jar files
+   * @return A list of jar file URLs
+   * @throws IOException If an I/O error occurs
+   */
+  private static List<URL> loadJarUrls(Path jarDir) throws IOException {
+    try (var stream = Files.list(jarDir)) {
+      return stream
+          .filter(p -> p.toString().endsWith(".jar"))
+          .map(
+              p -> {
+                try {
+                  return p.toUri().toURL();
+                } catch (Exception e) {
+                  throw new GravitinoRuntimeException(e, "Failed to convert 
path to URL: %s", p);
+                }
+              })
+          .collect(Collectors.toList());
+    } catch (IOException e) {
+      throw new IOException("Failed to list jar files in directory: " + 
jarDir.toString(), e);
+    }
+  }
+
+  @Override
+  protected Class<?> loadClass(String name, boolean resolve) throws 
ClassNotFoundException {
+    Class<?> loaded = findLoadedClass(name);
+    if (loaded != null) {
+      return loaded;
+    }
+    if (isBarrierClass(name)) {
+      return loadBarrierClass(name);
+    } else if (isSharedClass(name)) {
+      return loadSharedClass(name, resolve);
+    } else {
+      LOG.debug("Classloader {} loading isolated class {}", getName(), name);
+      return super.loadClass(name, resolve);
+    }
+  }
+
+  private Class<?> loadBarrierClass(String name) throws ClassNotFoundException 
{
+    LOG.debug("Classloader {} loading barrier class {}", getName(), name);
+    String classFileName = name.replace(".", "/") + ".class";
+    try (InputStream is = baseClassLoader.getResourceAsStream(classFileName)) {
+      if (is == null) {
+        throw new ClassNotFoundException("Cannot load barrier class: " + name);
+      }
+      byte[] bytes = is.readAllBytes();
+      return defineClass(name, bytes, 0, bytes.length);
+    } catch (IOException e) {
+      throw new ClassNotFoundException("Failed to load barrier class: " + 
name, e);
+    }
+  }
+
+  private Class<?> loadSharedClass(String name, boolean resolve) throws 
ClassNotFoundException {
+    LOG.debug("Classloader {} loading shared class {}", getName(), name);
+    try {
+      return baseClassLoader.loadClass(name);
+    } catch (ClassNotFoundException e) {
+      // Fallback to isolated classloader if not found in base
+      return super.loadClass(name, resolve);
+    }
+  }
+
+  /**
+   * Checks if a class should be shared with the base classloader.
+   *
+   * @param name The fully qualified class name
+   * @return true if the class should be shared, false otherwise
+   */
+  private boolean isSharedClass(String name) {
+    // Shared logging classes
+    if (name.startsWith("org.slf4j")
+        || name.startsWith("org.apache.log4j")
+        || name.startsWith("org.apache.logging.log4j")) {
+      return true;
+    }
+
+    // Shared Hadoop classes (excluding Hive-specific ones)
+    if (name.startsWith("org.apache.hadoop.") && 
!name.startsWith("org.apache.hadoop.hive.")) {
+      return true;
+    }
+
+    // Shared Google classes (excluding cloud-specific ones)
+    if (name.startsWith("com.google") && !name.startsWith("com.google.cloud")) 
{
+      return true;
+    }
+
+    // Java standard library classes
+    if (name.startsWith("java.")
+        || name.startsWith("javax.")
+        || name.startsWith("com.sun.")
+        || name.startsWith("org.ietf.jgss.")) {
+      return true;
+    }
+
+    // Gravitino classes
+    if (name.startsWith("org.apache.gravitino.") || 
name.startsWith(HiveClient.class.getName())) {

Review Comment:
   The condition `name.startsWith(HiveClient.class.getName())` is ineffective 
because `HiveClient.class.getName()` returns the full class name 
"org.apache.gravitino.hive.client.HiveClient", and `startsWith()` will only 
match that exact class, not other classes in the same package. This check is 
already covered by the previous condition 
`name.startsWith("org.apache.gravitino.")`, making it redundant. If the 
intention is to specifically include the `HiveClient` interface, the condition 
is correct but redundant; if the intention is to share all classes in the 
`hive.client` package, this should be removed or clarified.
   ```suggestion
       if (name.startsWith("org.apache.gravitino.")) {
   ```



##########
catalogs/hive-metastore-common/src/test/java/org/apache/gravitino/hive/client/TestHiveClient.java:
##########
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.client;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import org.apache.gravitino.catalog.hive.HiveConstants;
+import org.apache.gravitino.exceptions.ConnectionFailedException;
+import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.apache.gravitino.exceptions.NoSuchPartitionException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.NonEmptySchemaException;
+import org.apache.gravitino.exceptions.PartitionAlreadyExistsException;
+import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.hive.HivePartition;
+import org.apache.gravitino.hive.HiveSchema;
+import org.apache.gravitino.hive.HiveTable;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+// This class is use for manual testing against real Hive Metastore instances.
+@Disabled
+public class TestHiveClient {
+
+  private static final String HIVE2_HMS_URL = "thrift://172.17.0.4:9083";
+  private static final String HIVE2_HDFS_URL = "hdfs://172.17.0.4:9000";
+  private static final String HIVE3_HMS_URL = "thrift://172.17.0.3:9083";
+  private static final String HIVE3_HDFS_URL = "hdfs://172.17.0.3:9000";
+
+  private static final String KERBEROS_HIVE2_HMS_URL = 
"thrift://172.17.0.2:9083";
+  private static final String KERBEROS_HIVE2_HDFS_URL = 
"hdfs://172.17.0.2:9000";
+
+  @Test
+  void testHive2Client() throws Exception {
+    runHiveClientTest("", "hive2", HIVE2_HMS_URL, HIVE2_HDFS_URL + 
"/tmp/gravitino_test");
+  }
+
+  @Test
+  void testHive3DefaultCatalog() throws Exception {
+    // Hive3 default catalog is "hive", not empty string
+    runHiveClientTest(
+        "hive", "hive3_default", HIVE3_HMS_URL, HIVE3_HDFS_URL + 
"/tmp/gravitino_test");
+  }
+
+  @Test
+  void testHive3SampleCatalog() throws Exception {
+    runHiveClientTest(
+        "sample_catalog", "hive3_sample", HIVE3_HMS_URL, HIVE3_HDFS_URL + 
"/tmp/gravitino_test");
+  }
+
+  private void runHiveClientTest(
+      String catalogName, String testPrefix, String metastoreUri, String 
hdfsBasePath) {
+    Properties properties = new Properties();
+    properties.setProperty("hive.metastore.uris", metastoreUri);
+    HiveClient client = new HiveClientFactory(properties, 
"").createHiveClient();
+
+    String dbName = "gt_" + testPrefix + "_db_" + 
UUID.randomUUID().toString().replace("-", "");
+    String tableName = "gt_" + testPrefix + "_tbl_" + 
UUID.randomUUID().toString().replace("-", "");
+    String partitionValue = "p_" + UUID.randomUUID().toString().replace("-", 
"");
+    String partitionName = "dt=" + partitionValue;
+
+    String dbLocation = hdfsBasePath + "/" + dbName;
+    String tableLocation = hdfsBasePath + "/" + tableName;
+
+    HiveSchema schema = createTestSchema(catalogName, dbName, dbLocation);
+    HiveTable table = createTestTable(catalogName, dbName, tableName, 
tableLocation);
+    HivePartition partition = createTestPartition(partitionName, 
partitionValue);
+
+    try {
+      // Test database operations
+      client.createDatabase(schema);
+      List<String> allDatabases = client.getAllDatabases(catalogName);
+      Assertions.assertTrue(allDatabases.contains(dbName), "Database should be 
in the list");
+
+      HiveSchema loadedDb = client.getDatabase(catalogName, dbName);
+      Assertions.assertNotNull(loadedDb, "Loaded database should not be null");
+      Assertions.assertEquals(dbName, loadedDb.name(), "Database name should 
match");
+      Assertions.assertEquals(
+          schema.comment(), loadedDb.comment(), "Database comment should 
match");
+
+      client.alterDatabase(catalogName, dbName, schema);
+      HiveSchema alteredDb = client.getDatabase(catalogName, dbName);
+      Assertions.assertNotNull(alteredDb, "Altered database should not be 
null");
+
+      // Test table operations
+      client.createTable(table);
+      List<String> allTables = client.getAllTables(catalogName, dbName);
+      Assertions.assertTrue(allTables.contains(tableName), "Table should be in 
the list");
+
+      HiveTable loadedTable = client.getTable(catalogName, dbName, tableName);
+      Assertions.assertNotNull(loadedTable, "Loaded table should not be null");
+      Assertions.assertEquals(tableName, loadedTable.name(), "Table name 
should match");
+      Assertions.assertEquals(table.comment(), loadedTable.comment(), "Table 
comment should match");
+      Assertions.assertEquals(2, loadedTable.columns().length, "Table should 
have 2 columns");
+      Assertions.assertEquals(
+          1, loadedTable.partitioning().length, "Table should have 1 partition 
key");
+
+      client.alterTable(catalogName, dbName, tableName, loadedTable);
+      HiveTable alteredTable = client.getTable(catalogName, dbName, tableName);
+      Assertions.assertNotNull(alteredTable, "Altered table should not be 
null");
+
+      List<String> filteredTables =
+          client.listTableNamesByFilter(catalogName, dbName, "", (short) 10);
+      Assertions.assertTrue(
+          filteredTables.contains(tableName), "Filtered tables should contain 
the table");
+
+      List<HiveTable> tableObjects =
+          client.getTableObjectsByName(catalogName, dbName, 
List.of(tableName));
+      Assertions.assertEquals(1, tableObjects.size(), "Should get exactly one 
table object");
+      Assertions.assertEquals(
+          tableName, tableObjects.get(0).name(), "Table object name should 
match");
+
+      // Test partition operations
+      HivePartition addedPartition = client.addPartition(loadedTable, 
partition);
+      Assertions.assertNotNull(addedPartition, "Added partition should not be 
null");
+      Assertions.assertEquals(partitionName, addedPartition.name(), "Partition 
name should match");
+
+      List<String> partitionNames = client.listPartitionNames(loadedTable, 
(short) 10);
+      Assertions.assertTrue(
+          partitionNames.contains(partitionName), "Partition should be in the 
list");
+
+      List<HivePartition> partitions = client.listPartitions(loadedTable, 
(short) 10);
+      Assertions.assertEquals(1, partitions.size(), "Should have exactly one 
partition");
+      Assertions.assertEquals(
+          partitionName, partitions.get(0).name(), "Partition name should 
match");
+
+      List<HivePartition> filteredPartitions =
+          client.listPartitions(loadedTable, List.of(partitionValue), (short) 
10);
+      Assertions.assertEquals(
+          1, filteredPartitions.size(), "Should have exactly one filtered 
partition");
+
+      HivePartition fetchedPartition = client.getPartition(loadedTable, 
addedPartition.name());
+      Assertions.assertNotNull(fetchedPartition, "Fetched partition should not 
be null");
+      Assertions.assertEquals(
+          partitionName, fetchedPartition.name(), "Fetched partition name 
should match");
+
+      client.dropPartition(catalogName, dbName, tableName, 
addedPartition.name(), true);
+      List<String> partitionNamesAfterDrop = 
client.listPartitionNames(loadedTable, (short) 10);
+      Assertions.assertFalse(
+          partitionNamesAfterDrop.contains(partitionName),
+          "Partition should not be in the list after drop");
+
+      // Test delegation token (may not be available in all environments)
+      try {
+        String token =
+            client.getDelegationToken(
+                System.getProperty("user.name"), 
System.getProperty("user.name"));
+        Assertions.assertNotNull(token, "Delegation token should not be null");
+      } catch (Exception e) {
+        // Delegation token may not be available, this is acceptable
+      }
+
+      // Cleanup
+      client.dropTable(catalogName, dbName, tableName, true, true);
+      List<String> tablesAfterDrop = client.getAllTables(catalogName, dbName);
+      Assertions.assertFalse(
+          tablesAfterDrop.contains(tableName), "Table should not be in the 
list after drop");
+
+      client.dropDatabase(catalogName, dbName, true);
+      List<String> databasesAfterDrop = client.getAllDatabases(catalogName);
+      Assertions.assertFalse(
+          databasesAfterDrop.contains(dbName), "Database should not be in the 
list after drop");
+    } finally {
+      safelyDropTable(client, catalogName, dbName, tableName);
+      safelyDropDatabase(client, catalogName, dbName);
+    }
+  }
+
+  private HiveSchema createTestSchema(String catalogName, String dbName, 
String location) {
+    Map<String, String> properties = new HashMap<>();
+    properties.put(HiveConstants.LOCATION, location);
+    return HiveSchema.builder()
+        .withName(dbName)
+        .withComment("Test schema for HiveClient operations")
+        .withProperties(properties)
+        .withAuditInfo(defaultAudit())
+        .withCatalogName(catalogName)
+        .build();
+  }
+
+  private HiveTable createTestTable(
+      String catalogName, String databaseName, String tableName, String 
location) {
+    Column idColumn = Column.of("id", Types.IntegerType.get(), null, false, 
false, null);
+    Column dtColumn = Column.of("dt", Types.StringType.get());
+    Map<String, String> properties = new HashMap<>();
+    properties.put(HiveConstants.LOCATION, location);
+    return HiveTable.builder()
+        .withName(tableName)
+        .withColumns(new Column[] {idColumn, dtColumn})
+        .withComment("Test table for HiveClient operations")
+        .withProperties(properties)
+        .withAuditInfo(defaultAudit())
+        .withPartitioning(new Transform[] {Transforms.identity("dt")})
+        .withCatalogName(catalogName)
+        .withDatabaseName(databaseName)
+        .build();
+  }
+
+  private HivePartition createTestPartition(String partitionName, String 
value) {
+    HivePartition partition =
+        HivePartition.identity(
+            new String[][] {new String[] {"dt"}},
+            new Literal<?>[] {Literals.stringLiteral(value)},
+            Map.of());
+    Assertions.assertEquals(partitionName, partition.name());
+    return partition;
+  }
+
+  private AuditInfo defaultAudit() {
+    return AuditInfo.builder()
+        .withCreator(System.getProperty("user.name", "gravitino"))
+        .withCreateTime(Instant.now())
+        .build();
+  }
+
+  private void safelyDropTable(
+      HiveClient client, String catalogName, String dbName, String tableName) {
+    try {
+      client.dropTable(catalogName, dbName, tableName, true, true);
+    } catch (Exception ignored) {
+      // ignore cleanup failures
+    }
+  }
+
+  private void safelyDropDatabase(HiveClient client, String catalogName, 
String dbName) {
+    try {
+      client.dropDatabase(catalogName, dbName, true);
+    } catch (Exception ignored) {
+      // ignore cleanup failures
+    }
+  }
+
+  @Test
+  void testHiveExceptionHandling() throws Exception {
+    testHiveExceptionHandlingForVersion("", HIVE2_HMS_URL, HIVE2_HDFS_URL);
+  }
+
+  @Test
+  void testHive3ExceptionHandling() throws Exception {
+    testHiveExceptionHandlingForVersion("hive", HIVE3_HMS_URL, HIVE3_HDFS_URL);
+  }
+
+  private void testHiveExceptionHandlingForVersion(
+      String catalogName, String metastoreUri, String hdfsBasePath) throws 
Exception {
+    Properties properties = new Properties();
+    properties.setProperty("hive.metastore.uris", metastoreUri);
+    HiveClient client = new HiveClientFactory(properties, 
"").createHiveClient();
+
+    String dbName = "gt_exception_test_db_" + 
UUID.randomUUID().toString().replace("-", "");
+    String tableName = "gt_exception_test_tbl_" + 
UUID.randomUUID().toString().replace("-", "");
+    String partitionValue = "p_" + UUID.randomUUID().toString().replace("-", 
"");
+    String partitionName = "dt=" + partitionValue;
+
+    String dbLocation = hdfsBasePath + "/" + dbName;
+    String tableLocation = hdfsBasePath + "/" + tableName;
+
+    HiveSchema schema = createTestSchema(catalogName, dbName, dbLocation);
+    HiveTable table = createTestTable(catalogName, dbName, tableName, 
tableLocation);
+    HivePartition partition = createTestPartition(partitionName, 
partitionValue);
+
+    try {
+      // Test SchemaAlreadyExistsException - create database twice
+      try {
+        client.createDatabase(schema);
+      } catch (GravitinoRuntimeException e) {
+        // If permission error occurs, skip this test
+        if (e.getCause() != null
+            && e.getCause().getMessage() != null
+            && e.getCause().getMessage().contains("Permission denied")) {
+          return; // Skip test if permission denied
+        }
+        throw e;
+      }
+      Assertions.assertThrows(
+          SchemaAlreadyExistsException.class, () -> 
client.createDatabase(schema));
+
+      // Test NoSuchSchemaException - get non-existent database
+      Assertions.assertThrows(
+          NoSuchSchemaException.class,
+          () -> client.getDatabase(catalogName, "non_existent_db_" + 
UUID.randomUUID()));
+
+      // Test TableAlreadyExistsException - create table twice
+      client.createTable(table);
+      Assertions.assertThrows(TableAlreadyExistsException.class, () -> 
client.createTable(table));
+
+      // Test NoSuchTableException - get non-existent table
+      Assertions.assertThrows(
+          NoSuchTableException.class,
+          () -> client.getTable(catalogName, dbName, "non_existent_table_" + 
UUID.randomUUID()));
+
+      // Test PartitionAlreadyExistsException - add partition twice
+      HiveTable loadedTable = client.getTable(catalogName, dbName, tableName);
+      HivePartition addedPartition = client.addPartition(loadedTable, 
partition);
+      Assertions.assertNotNull(addedPartition, "Added partition should not be 
null");
+      Assertions.assertThrows(
+          PartitionAlreadyExistsException.class, () -> 
client.addPartition(loadedTable, partition));
+
+      // Test NoSuchPartitionException - get non-existent partition
+      Assertions.assertThrows(
+          NoSuchPartitionException.class,
+          () -> client.getPartition(loadedTable, "dt=non_existent_partition_" 
+ UUID.randomUUID()));
+
+      // Test NonEmptySchemaException - try to drop database with tables 
(cascade=false)
+      Exception exception =
+          Assertions.assertThrows(
+              Exception.class, () -> client.dropDatabase(catalogName, dbName, 
false));
+      // Hive may throw different exceptions for non-empty database
+      // The converter should handle it appropriately
+      Assertions.assertTrue(
+          exception instanceof NonEmptySchemaException
+              || exception instanceof GravitinoRuntimeException,
+          "Should throw NonEmptySchemaException or GravitinoRuntimeException, 
got: "
+              + exception.getClass().getName());
+
+      // Cleanup
+      client.dropPartition(catalogName, dbName, tableName, 
addedPartition.name(), true);
+      client.dropTable(catalogName, dbName, tableName, true, true);
+      client.dropDatabase(catalogName, dbName, true);
+    } finally {
+      safelyDropTable(client, catalogName, dbName, tableName);
+      safelyDropDatabase(client, catalogName, dbName);
+    }
+  }
+
+  private void testConnectionFailedExceptionForVersion(String catalogName) {
+    // Test with invalid/unreachable Hive Metastore URI
+    String invalidMetastoreUri = "thrift://127.0.0.1:9999";
+    Properties properties = new Properties();
+    properties.setProperty("hive.metastore.uris", invalidMetastoreUri);
+
+    // Connection failure may occur during client creation or operation
+    // Both should be converted to ConnectionFailedException
+    Exception exception =
+        Assertions.assertThrows(
+            Exception.class,
+            () -> {
+              HiveClient client = new HiveClientFactory(properties, 
"").createHiveClient();
+              client.getAllDatabases(catalogName);
+            });
+
+    // Verify the exception is converted to ConnectionFailedException
+    Assertions.assertTrue(
+        exception instanceof ConnectionFailedException,
+        "Should throw ConnectionFailedException, got: " + 
exception.getClass().getName());
+    Assertions.assertNotNull(
+        ((ConnectionFailedException) exception).getCause(), "Exception should 
have a cause");
+  }
+
+  @Test
+  void testConnectionFailedException() throws Exception {
+    // Test with HIVE2
+    testConnectionFailedExceptionForVersion("");
+
+    // Test with HIVE3
+    testConnectionFailedExceptionForVersion("hive");
+  }
+
+  @Test
+  void testKerberosConnection() {
+    // This method can be implemented to test Kerberos authentication with 
Hive Metastore
+    // when a Kerberos-enabled environment is available.
+    Properties properties = new Properties();
+    properties.setProperty("hive.metastore.uris", KERBEROS_HIVE2_HMS_URL);
+    properties.setProperty("authentication.kerberos.principal", 
"cli@HADOOPKRB");
+    properties.setProperty("authentication.impersonation-enable", "true");
+    properties.setProperty(
+        "authentication.kerberos.keytab-uri", 
"/tmp/test4310082059861441407/client.keytab");
+    properties.setProperty("hive.metastore.kerberos.principal", 
"hive/6b1955fcb754@HADOOPKRB");
+    properties.setProperty("hive.metastore.sasl.enabled", "true");
+    properties.setProperty("hadoop.security.authentication", "kerberos");
+
+    System.setProperty("java.security.krb5.conf", 
"/tmp/test4310082059861441407/krb5.conf");

Review Comment:
   Hardcoded temporary paths like `/tmp/test4310082059861441407/client.keytab` 
and `/tmp/test4310082059861441407/krb5.conf` are unlikely to exist and make the 
test fragile. These should either be created dynamically using a temporary 
directory, made configurable, or the test should validate that the files exist 
before attempting to use them.



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveExceptionConverter.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hive.client;
+
+/**
+ * Utility class to convert Hive exceptions to Gravitino exceptions. This 
class handles various
+ * types of exceptions that can be thrown by Hive Metastore operations, 
including:
+ *
+ * <ul>
+ *   <li>Reflection exceptions (InvocationTargetException)
+ *   <Li>Hive Metastore exceptions (e.g., AlreadyExistsException, 
NoSuchObjectException,

Review Comment:
   Inconsistent HTML tag capitalization: `<Li>` should be `<li>` to match the 
other list items in the Javadoc.
   ```suggestion
    *   <li>Hive Metastore exceptions (e.g., AlreadyExistsException, 
NoSuchObjectException,
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientFactory.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.client;
+
+import static 
org.apache.gravitino.hive.client.HiveClientClassLoader.HiveVersion.HIVE2;
+import static 
org.apache.gravitino.hive.client.HiveClientClassLoader.HiveVersion.HIVE3;
+import static 
org.apache.gravitino.hive.client.Util.buildConfigurationFromProperties;
+
+import com.google.common.base.Preconditions;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.util.Properties;
+import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class HiveClientFactory {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveClientFactory.class);
+
+  // Remember which Hive backend version worked successfully for this factory.
+  private volatile HiveClientClassLoader.HiveVersion backendVersion;
+  private volatile HiveClientClassLoader backendClassLoader;
+
+  @SuppressWarnings("UnusedVariable")
+  private final Configuration hadoopConf;

Review Comment:
   The field `hadoopConf` is annotated with 
`@SuppressWarnings("UnusedVariable")` but is never used anywhere in the class. 
If it's not needed, remove it to reduce code clutter. If it's intended for 
future use, document that purpose.



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/HiveTable.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hive;
+
+import static org.apache.gravitino.catalog.hive.HiveConstants.COMMENT;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.Optional;
+import java.util.Set;
+import lombok.ToString;
+import org.apache.gravitino.catalog.hive.TableType;
+import org.apache.gravitino.connector.BaseTable;
+import org.apache.gravitino.connector.ProxyPlugin;
+import org.apache.gravitino.connector.TableOperations;
+
+/** Represents an Apache Hive Table entity in the Hive Metastore catalog. */
+@ToString
+public class HiveTable extends BaseTable {
+
+  // A set of supported Hive table types.
+  public static final Set<String> SUPPORT_TABLE_TYPES =
+      Sets.newHashSet(TableType.MANAGED_TABLE.name(), 
TableType.EXTERNAL_TABLE.name());
+  public static final String ICEBERG_TABLE_TYPE_VALUE = "ICEBERG";
+  public static final String TABLE_TYPE_PROP = "table_type";
+  private String catalogName;
+  private String databaseName;
+
+  protected HiveTable() {}
+
+  public String catalogName() {
+    return catalogName;
+  }
+
+  public String databaseName() {
+    return databaseName;
+  }
+
+  public void setProxyPlugin(ProxyPlugin plugin) {
+    this.proxyPlugin = Optional.ofNullable(plugin);
+  }
+
+  @Override
+  protected TableOperations newOps() throws UnsupportedOperationException {
+    throw new UnsupportedOperationException();
+  }
+
+  /** A builder class for constructing HiveTable instances. */
+  public static class Builder extends BaseTableBuilder<Builder, HiveTable> {
+
+    private String catalogName;
+    private String databaseName;
+
+    /** Creates a new instance of {@link Builder}. */
+    private Builder() {}

Review Comment:
   The `HiveTable` class is missing a `public static Builder builder()` method. 
The test class `TestHiveClient.java` at line 218 calls `HiveTable.builder()`, 
but this method is not defined in the `HiveTable` class. Add the following 
method to enable the builder pattern:
   
   ```java
   public static Builder builder() {
     return new Builder();
   }
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientFactory.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.client;
+
+import static 
org.apache.gravitino.hive.client.HiveClientClassLoader.HiveVersion.HIVE2;
+import static 
org.apache.gravitino.hive.client.HiveClientClassLoader.HiveVersion.HIVE3;
+import static 
org.apache.gravitino.hive.client.Util.buildConfigurationFromProperties;
+
+import com.google.common.base.Preconditions;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.util.Properties;
+import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class HiveClientFactory {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveClientFactory.class);
+
+  // Remember which Hive backend version worked successfully for this factory.
+  private volatile HiveClientClassLoader.HiveVersion backendVersion;
+  private volatile HiveClientClassLoader backendClassLoader;
+
+  @SuppressWarnings("UnusedVariable")
+  private final Configuration hadoopConf;
+
+  private final Properties properties;
+
+  /**
+   * Creates a {@link HiveClientFactory} bound to the given configuration 
properties.
+   *
+   * @param properties Hive client configuration, must not be null.
+   * @param id An identifier for this factory instance.
+   */
+  public HiveClientFactory(Properties properties, String id) {
+    Preconditions.checkArgument(properties != null, "Properties cannot be 
null");
+    this.properties = properties;
+
+    try {
+      this.hadoopConf = buildConfigurationFromProperties(properties);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to initialize HiveClientFactory", e);
+    }
+  }
+
+  /** Creates a {@link HiveClient} using the properties associated with this 
factory. */
+  public HiveClient createHiveClient() {
+    HiveClient client = null;
+    try {
+      if (backendVersion != null) {
+        HiveClientClassLoader classLoader = 
getOrCreateClassLoader(backendVersion);
+        client = createHiveClientInternal(classLoader);
+        LOG.info("Connected to Hive Metastore using cached Hive version {}", 
backendVersion.name());
+        return client;
+      }
+    } catch (Exception e) {
+      LOG.warn(
+          "Failed to connect to Hive Metastore using cached Hive version {}", 
backendVersion, e);
+      throw new RuntimeException("Failed to connect to Hive Metastore", e);

Review Comment:
   When an exception occurs while using the cached backend version (line 
76-79), the exception is wrapped in a generic `RuntimeException` instead of 
using `HiveExceptionConverter.toGravitinoException()`. This is inconsistent 
with the exception handling pattern used elsewhere (e.g., lines 116-117, 
170-173) and may not properly convert Hive-specific exceptions to appropriate 
Gravitino exceptions.
   ```suggestion
         throw HiveExceptionConverter.toGravitinoException(
             e, HiveExceptionConverter.ExceptionTarget.other("Failed to connect 
to Hive Metastore"));
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/HivePartition.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hive;
+
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.exceptions.NoSuchPartitionException;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.partitions.IdentityPartition;
+
+/** Represents a Hive identity partition with helpers to build and parse 
partition specs. */
+public final class HivePartition implements IdentityPartition {
+
+  private static final String PARTITION_NAME_DELIMITER = "/";
+  private static final String PARTITION_VALUE_DELIMITER = "=";
+
+  private final String name;
+  private final String[][] fieldNames;
+  private final Literal<?>[] values;
+  private final Map<String, String> properties;
+
+  private HivePartition(
+      String name, String[][] fieldNames, Literal<?>[] values, Map<String, 
String> properties) {
+    Preconditions.checkArgument(fieldNames != null, "Partition field names 
must not be null");
+    Preconditions.checkArgument(values != null, "Partition values must not be 
null");
+    Preconditions.checkArgument(
+        fieldNames.length == values.length,
+        "Partition field names size %s must equal values size %s",
+        fieldNames.length,
+        values.length);
+    Arrays.stream(fieldNames)
+        .forEach(
+            fn ->
+                Preconditions.checkArgument(
+                    fn.length == 1, "Hive catalog does not support nested 
partition field names"));
+
+    this.fieldNames = fieldNames;
+    this.values = values;
+    this.properties = properties;
+    this.name = StringUtils.isNotEmpty(name) ? name : buildPartitionName();
+    Preconditions.checkArgument(
+        StringUtils.isNotEmpty(this.name), "Partition name must not be null or 
empty");
+  }
+
+  public static HivePartition identity(String[][] fieldNames, Literal<?>[] 
values) {
+    return identity(fieldNames, values, Collections.emptyMap());
+  }
+
+  public static HivePartition identity(
+      String[][] fieldNames, Literal<?>[] values, Map<String, String> 
properties) {
+    return new HivePartition(null, fieldNames, values, properties);
+  }
+
+  public static HivePartition identity(String partitionName) {
+    return identity(partitionName, Collections.emptyMap());
+  }
+
+  public static HivePartition identity(String partitionName, Map<String, 
String> properties) {
+    Preconditions.checkArgument(
+        StringUtils.isNotEmpty(partitionName), "Partition name must not be 
null or empty");
+    String[][] fieldNames = extractPartitionFieldNames(partitionName);
+    Literal<?>[] values =
+        extractPartitionValues(partitionName).stream()
+            .map(Literals::stringLiteral)
+            .toArray(Literal[]::new);
+    return new HivePartition(partitionName, fieldNames, values, properties);
+  }
+
+  @Override
+  public String name() {
+    return name;
+  }
+
+  public String[][] fieldNames() {
+    return fieldNames;
+  }
+
+  public Literal<?>[] values() {
+    return values;
+  }
+
+  @Override
+  public Map<String, String> properties() {
+    return properties;
+  }
+
+  private String buildPartitionName() {
+    return IntStream.range(0, fieldNames.length)
+        .mapToObj(idx -> fieldNames[idx][0] + PARTITION_VALUE_DELIMITER + 
values[idx].value())
+        .collect(Collectors.joining(PARTITION_NAME_DELIMITER));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof HivePartition)) {
+      return false;
+    }
+    HivePartition that = (HivePartition) o;
+    return Objects.equals(name, that.name)
+        && Arrays.deepEquals(fieldNames, that.fieldNames)
+        && Arrays.equals(values, that.values)
+        && Objects.equals(properties, that.properties);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = Objects.hash(name, properties);
+    result = 31 * result + Arrays.deepHashCode(fieldNames);
+    result = 31 * result + Arrays.hashCode(values);
+    return result;
+  }
+
+  public static List<String> extractPartitionValues(String partitionName) {
+    if (StringUtils.isEmpty(partitionName)) {
+      return Collections.emptyList();
+    }
+    return Arrays.stream(partitionName.split(PARTITION_NAME_DELIMITER))
+        .map(
+            field -> {
+              String[] kv = field.split(PARTITION_VALUE_DELIMITER, 2);
+              return kv.length > 1 ? kv[1] : "";
+            })
+        .collect(Collectors.toList());
+  }
+
+  public static List<String> extractPartitionValues(
+      List<String> partitionFieldNames, String partitionSpec) {
+    Preconditions.checkArgument(
+        partitionFieldNames != null, "Partition field names must not be null");
+    if (partitionFieldNames.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    Map<String, String> partSpecMap = new HashMap<>();
+    if (StringUtils.isNotEmpty(partitionSpec)) {
+      Arrays.stream(partitionSpec.split(PARTITION_NAME_DELIMITER))
+          .forEach(
+              part -> {
+                String[] keyValue = part.split(PARTITION_VALUE_DELIMITER, 2);
+                if (keyValue.length != 2) {
+                  throw new IllegalArgumentException(
+                      String.format("Error partition format: %s", 
partitionSpec));

Review Comment:
   The error message "Error partition format: %s" is grammatically incorrect. 
It should be "Invalid partition format: %s" or "Incorrect partition format: %s" 
for better clarity.
   ```suggestion
                         String.format("Invalid partition format: %s", 
partitionSpec));
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShim.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hive.client;
+
+import java.util.List;
+import java.util.Properties;
+import org.apache.gravitino.hive.HivePartition;
+import org.apache.gravitino.hive.HiveSchema;
+import org.apache.gravitino.hive.HiveTable;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.thrift.TException;
+
+/**
+ * Java translation of Scala's `Shim` sealed abstract class.
+ *
+ * <p>This class declares the compatibility layer between Spark and different 
Hive versions.
+ * Concrete subclasses (e.g. Shim_v0_12, Shim_v0_13, Shim_v2_3, Shim_v3_0 ...) 
must implement these
+ * methods according to the behavior of the corresponding Hive release.

Review Comment:
   The class documentation incorrectly states "Java translation of Scala's 
`Shim` sealed abstract class" and references "compatibility layer between Spark 
and different Hive versions." This appears to be copied from Spark code but 
this is for Apache Gravitino's Hive catalog, not Spark. The documentation 
should be updated to accurately reflect that this is Gravitino's compatibility 
layer for different Hive Metastore versions.
   ```suggestion
    * Compatibility layer for different Hive Metastore versions in Apache 
Gravitino.
    *
    * <p>This abstract class defines the interface for interacting with various 
versions of the Hive Metastore.
    * Concrete subclasses (e.g., Shim_v0_12, Shim_v0_13, Shim_v2_3, Shim_v3_0, 
etc.) must implement these
    * methods according to the behavior of the corresponding Hive Metastore 
release.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to