Copilot commented on code in PR #9458:
URL: https://github.com/apache/gravitino/pull/9458#discussion_r2609039998


##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/FetchFileUtils.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hive.kerberos;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class FetchFileUtils {
+
+  private FetchFileUtils() {}
+
+  public static void fetchFileFromUri(
+      String fileUri, File destFile, int timeout, Configuration conf) throws 
java.io.IOException {
+    try {
+      URI uri = new URI(fileUri);
+      String scheme = 
java.util.Optional.ofNullable(uri.getScheme()).orElse("file");
+
+      switch (scheme) {
+        case "http":
+        case "https":
+        case "ftp":
+          FileUtils.copyURLToFile(uri.toURL(), destFile, timeout * 1000, 
timeout * 1000);
+          break;
+
+        case "file":
+          Files.createSymbolicLink(destFile.toPath(), new 
File(uri.getPath()).toPath());
+          break;
+
+        case "hdfs":
+          FileSystem.get(conf).copyToLocalFile(new Path(uri), new 
Path(destFile.toURI()));
+          break;
+
+        default:
+          throw new IllegalArgumentException(
+              String.format("Doesn't support the scheme %s", scheme));

Review Comment:
   The error message uses a double negative which makes it unclear. Instead of 
"Doesn't support", use "Unsupported" or "The scheme '{scheme}' is not 
supported".
   ```suggestion
                 String.format("The scheme '%s' is not supported", scheme));
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/ProxyHiveClientImpl.java:
##########
@@ -18,12 +18,80 @@
  */
 package org.apache.gravitino.hive.client;
 
+import java.io.IOException;
 import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Properties;
+import org.apache.hadoop.security.UserGroupInformation;
 

Review Comment:
   The class ProxyHiveClientImpl is missing public API documentation. Given 
that this class implements a security-sensitive proxy pattern for Kerberos 
authentication, it should have comprehensive class-level Javadoc explaining its 
purpose, thread-safety characteristics, and usage constraints. The 
documentation should clarify when and how this proxy should be used versus the 
direct HiveClient implementation.
   ```suggestion
   
   /**
    * A dynamic proxy {@link InvocationHandler} for {@link HiveClient} that 
ensures all method calls are executed
    * under the context of a specific Kerberos principal using {@link 
UserGroupInformation#doAs(PrivilegedExceptionAction)}.
    * <p>
    * <b>Purpose:</b> This class is used to wrap a {@link HiveClient} so that 
all operations are performed with the
    * privileges of the specified Kerberos-authenticated user. This is 
essential in environments where secure
    * authentication and authorization are required for Hive Metastore access.
    * <p>
    * <b>Thread-safety:</b> This class is thread-safe as long as the underlying 
{@link HiveClient} implementation is
    * thread-safe and the provided {@link UserGroupInformation} is not mutated 
after construction. The proxy itself
    * does not maintain any mutable state after initialization.
    * <p>
    * <b>Usage constraints:</b>
    * <ul>
    *   <li>Instances must be created via the static {@link #createClient} 
method; direct instantiation is not supported.</li>
    *   <li>Kerberos authentication must be properly configured, and the {@link 
UserGroupInformation} must represent a
    *       valid, logged-in user (typically via keytab or ticket cache).</li>
    *   <li>This proxy should be used when Hive Metastore operations must be 
performed as a specific Kerberos principal.
    *       For non-secure or simple authentication scenarios, a direct {@link 
HiveClient} implementation may be used instead.</li>
    * </ul>
    * <p>
    * <b>Security note:</b> Improper use of this proxy (e.g., with an incorrect 
or uninitialized {@link UserGroupInformation})
    * may result in authentication failures or privilege escalation. Always 
ensure the Kerberos context is valid before use.
    */
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientFactory.java:
##########
@@ -38,14 +41,19 @@
 public final class HiveClientFactory {
   private static final Logger LOG = 
LoggerFactory.getLogger(HiveClientFactory.class);
 
+  public static final String GRAVITINO_KEYTAB_FORMAT = 
"keytabs/gravitino-hive-%s-keytab";
+

Review Comment:
   The constant GRAVITINO_KEYTAB_FORMAT is defined but never used in this 
class. It appears the actual keytab path format is constructed differently in 
the constructor using String.format("keytabs/gravitino-%s-keytab", id). Either 
this constant should be used or removed to avoid confusion.
   ```suggestion
   
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosConfig.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.kerberos;
+
+import java.util.Map;
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.config.ConfigBuilder;
+import org.apache.gravitino.config.ConfigConstants;
+import org.apache.gravitino.config.ConfigEntry;
+import org.apache.hadoop.conf.Configuration;
+
+public class KerberosConfig extends AuthenticationConfig {
+  public static final String KEY_TAB_URI_KEY = 
"authentication.kerberos.keytab-uri";
+
+  public static final String PRINCIPAL_KEY = 
"authentication.kerberos.principal";
+
+  public static final String CHECK_INTERVAL_SEC_KEY = 
"authentication.kerberos.check-interval-sec";
+
+  public static final String FETCH_TIMEOUT_SEC_KEY =
+      "authentication.kerberos.keytab-fetch-timeout-sec";
+
+  public static final ConfigEntry<String> PRINCIPAL_ENTRY =
+      new ConfigBuilder(PRINCIPAL_KEY)
+          .doc("The principal of the Kerberos connection")
+          .version(ConfigConstants.VERSION_1_0_0)
+          .stringConf()
+          .checkValue(StringUtils::isNotBlank, 
ConfigConstants.NOT_BLANK_ERROR_MSG)
+          .create();
+
+  public static final ConfigEntry<String> KEYTAB_ENTRY =
+      new ConfigBuilder(KEY_TAB_URI_KEY)
+          .doc("The keytab of the Kerberos connection")
+          .version(ConfigConstants.VERSION_1_0_0)
+          .stringConf()
+          .checkValue(StringUtils::isNotBlank, 
ConfigConstants.NOT_BLANK_ERROR_MSG)
+          .create();
+
+  public static final ConfigEntry<Integer> CHECK_INTERVAL_SEC_ENTRY =
+      new ConfigBuilder(CHECK_INTERVAL_SEC_KEY)
+          .doc("The check interval of the Kerberos connection for Hudi 
catalog")

Review Comment:
   The documentation mentions "Hudi catalog" but this configuration is for the 
Hive catalog. This appears to be a copy-paste error from another catalog 
implementation.
   ```suggestion
             .doc("The check interval of the Kerberos connection")
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.kerberos;
+
+import static 
org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_TOKEN_SIGNATURE;
+import static org.apache.gravitino.hive.kerberos.KerberosConfig.PRINCIPAL_KEY;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.hive.client.HiveClient;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+

Review Comment:
   The KerberosClient class is missing class-level Javadoc documentation. Given 
the complexity and security-sensitive nature of this class (handling Kerberos 
authentication, delegation tokens, and user impersonation), comprehensive 
class-level documentation is essential to explain its purpose, lifecycle, 
thread-safety characteristics, and usage patterns.
   ```suggestion
   
   /**
    * KerberosClient handles Kerberos authentication, delegation token 
management, and user impersonation
    * for secure interactions with Hive Metastore and Hadoop services.
    * <p>
    * <b>Purpose:</b>
    * <ul>
    *   <li>Performs Kerberos login using a principal and keytab file.</li>
    *   <li>Manages delegation tokens for secure communication with Hive 
Metastore.</li>
    *   <li>Supports user impersonation (proxy users) for multi-user 
environments.</li>
    *   <li>Optionally refreshes Kerberos credentials periodically to maintain 
long-lived sessions.</li>
    * </ul>
    * <p>
    * <b>Lifecycle:</b>
    * <ul>
    *   <li>Instantiate with configuration, Hadoop configuration, refresh flag, 
and keytab path.</li>
    *   <li>Call {@link #login()} to perform Kerberos authentication and 
initialize the client.</li>
    *   <li>Use {@link #loginProxyUser(String)} to obtain a proxy user UGI for 
impersonation.</li>
    *   <li>Call {@link #close()} to release resources and stop background 
threads.</li>
    * </ul>
    * <p>
    * <b>Thread-safety:</b>
    * <ul>
    *   <li>Most methods are thread-safe for concurrent use, but the underlying 
UGI and HiveClient
    *       references are managed with volatile fields for safe 
publication.</li>
    *   <li>Background credential refresh is managed by a single-threaded 
executor.</li>
    * </ul>
    * <p>
    * <b>Usage patterns:</b>
    * <pre>
    *   KerberosClient client = new KerberosClient(props, hadoopConf, true, 
"/path/to/keytab");
    *   UserGroupInformation ugi = client.login();
    *   // Use ugi or create proxy users as needed
    *   client.close();
    * </pre>
    * <p>
    * <b>Security considerations:</b>
    * <ul>
    *   <li>Ensure the keytab file is securely stored and access is 
restricted.</li>
    *   <li>Principal and keytab must match the service identity configured in 
Hadoop/Hive.</li>
    *   <li>Always call {@link #close()} to prevent credential leaks and stop 
background threads.</li>
    * </ul>
    */
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.kerberos;
+
+import static 
org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_TOKEN_SIGNATURE;
+import static org.apache.gravitino.hive.kerberos.KerberosConfig.PRINCIPAL_KEY;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.hive.client.HiveClient;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KerberosClient implements java.io.Closeable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(KerberosClient.class);
+
+  private volatile ScheduledThreadPoolExecutor checkTgtExecutor;
+  private final Properties conf;
+  private final Configuration hadoopConf;
+  private final boolean refreshCredentials;
+  private volatile UserGroupInformation realLoginUgi;
+  private final String keytabFilePath;
+  private volatile HiveClient hiveClient = null;
+
+  public KerberosClient(
+      Properties properties,
+      Configuration hadoopConf,
+      boolean refreshCredentials,
+      String keytabFilePath)
+      throws IOException {
+    this.conf = properties;
+    this.hadoopConf = hadoopConf;
+    this.refreshCredentials = refreshCredentials;
+    File keyTabFile = saveKeyTabFileFromUri(keytabFilePath);
+    this.keytabFilePath = keyTabFile.getAbsolutePath();
+  }
+
+  public UserGroupInformation loginProxyUser(String currentUser) {
+    try {
+      if (currentUser.equals(realLoginUgi.getUserName()) || hiveClient == 
null) {
+        return realLoginUgi;
+      }
+
+      String tokenSignature = conf.getProperty(HIVE_METASTORE_TOKEN_SIGNATURE, 
"");
+      String principal = conf.getProperty(PRINCIPAL_KEY, "");
+      @SuppressWarnings("null")
+      List<String> principalComponents = 
Splitter.on('@').splitToList(principal);
+      Preconditions.checkArgument(
+          principalComponents.size() == 2, "The principal has the wrong 
format");
+      String kerberosRealm = principalComponents.get(1);
+
+      UserGroupInformation proxyUser;
+      final String finalPrincipalName;
+      if (!currentUser.contains("@")) {
+        finalPrincipalName = String.format("%s@%s", currentUser, 
kerberosRealm);
+      } else {
+        finalPrincipalName = currentUser;
+      }
+
+      proxyUser = UserGroupInformation.createProxyUser(finalPrincipalName, 
realLoginUgi);
+
+      // Acquire HMS delegation token for the proxy user and attach it to UGI
+      Preconditions.checkArgument(hiveClient != null, "HiveClient is not set 
in KerberosClient");
+      String tokenStr =
+          hiveClient.getDelegationToken(finalPrincipalName, 
realLoginUgi.getUserName());
+
+      Token<DelegationTokenIdentifier> delegationToken = new Token<>();
+      delegationToken.decodeFromUrlString(tokenStr);
+      delegationToken.setService(new Text(tokenSignature));
+      proxyUser.addToken(delegationToken);
+
+      return proxyUser;
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create proxy user for Kerberos 
Hive client", e);
+    }
+  }
+
+  public UserGroupInformation login() throws Exception {
+    KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf);
+
+    // Check the principal and keytab file
+    String catalogPrincipal = kerberosConfig.getPrincipalName();
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(catalogPrincipal), "The principal can't be 
blank");
+    List<String> principalComponents = 
Splitter.on('@').splitToList(catalogPrincipal);
+    Preconditions.checkArgument(
+        principalComponents.size() == 2, "The principal has the wrong format");
+
+    // Login
+    UserGroupInformation.setConfiguration(hadoopConf);
+    UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath);
+    UserGroupInformation loginUgi = UserGroupInformation.getLoginUser();
+    realLoginUgi = loginUgi;
+
+    // Refresh the cache if it's out of date.
+    if (refreshCredentials) {
+      if (checkTgtExecutor == null) {
+        checkTgtExecutor = new ScheduledThreadPoolExecutor(1, 
getThreadFactory("check-tgt"));
+      }
+      int checkInterval = kerberosConfig.getCheckIntervalSec();
+      checkTgtExecutor.scheduleAtFixedRate(
+          () -> {
+            try {
+              loginUgi.checkTGTAndReloginFromKeytab();
+            } catch (Exception e) {
+              LOG.error("Fail to refresh ugi token: ", e);
+            }
+          },
+          checkInterval,
+          checkInterval,
+          TimeUnit.SECONDS);
+    }
+
+    return loginUgi;
+  }
+
+  public File saveKeyTabFileFromUri(String path) throws IOException {
+    KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf);
+
+    String keyTabUri = kerberosConfig.getKeytab();
+    Preconditions.checkArgument(StringUtils.isNotBlank(keyTabUri), "Keytab uri 
can't be blank");
+    Preconditions.checkArgument(
+        !keyTabUri.trim().startsWith("hdfs"), "Keytab uri doesn't support to 
use HDFS");
+
+    File keytabsDir = new File("keytabs");
+    if (!keytabsDir.exists()) {
+      keytabsDir.mkdir();
+    }
+    File keytabFile = new File(path);
+    keytabFile.deleteOnExit();
+    if (keytabFile.exists() && !keytabFile.delete()) {
+      throw new IllegalStateException(
+          String.format("Fail to delete keytab file %s", 
keytabFile.getAbsolutePath()));
+    }
+    int fetchKeytabFileTimeout = kerberosConfig.getFetchTimeoutSec();
+    FetchFileUtils.fetchFileFromUri(keyTabUri, keytabFile, 
fetchKeytabFileTimeout, hadoopConf);
+    return keytabFile;
+  }
+
+  private static ThreadFactory getThreadFactory(String factoryName) {
+    return new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat(factoryName + 
"-%d").build();
+  }
+
+  @Override
+  public void close() {
+    if (checkTgtExecutor != null) {
+      checkTgtExecutor.shutdown();

Review Comment:
   The executor shutdown in close() is not waiting for tasks to complete. This 
could cause the TGT refresh task to be abruptly terminated mid-execution, 
potentially leaving the UGI in an inconsistent state. Consider using 
shutdownNow() to interrupt running tasks or awaitTermination() to wait for 
graceful completion, and handle any exceptions appropriately.
   ```suggestion
         checkTgtExecutor.shutdown();
         try {
           if (!checkTgtExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
             LOG.warn("TGT refresh executor did not terminate in the allotted 
time. Forcing shutdown.");
             checkTgtExecutor.shutdownNow();
           }
         } catch (InterruptedException e) {
           LOG.warn("Interrupted while waiting for TGT refresh executor to 
terminate. Forcing shutdown.", e);
           checkTgtExecutor.shutdownNow();
           Thread.currentThread().interrupt();
         }
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/AuthenticationConfig.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.kerberos;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+
+import java.util.Map;
+import java.util.Properties;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.config.ConfigBuilder;
+import org.apache.gravitino.config.ConfigConstants;
+import org.apache.gravitino.config.ConfigEntry;
+import org.apache.hadoop.conf.Configuration;
+
+public class AuthenticationConfig extends Config {
+
+  // The key for the authentication type, currently we support Kerberos and 
simple
+  public static final String AUTH_TYPE_KEY = "authentication.type";
+
+  public static final String IMPERSONATION_ENABLE_KEY = 
"authentication.impersonation-enable";
+
+  enum AuthenticationType {
+    SIMPLE,
+    KERBEROS;
+  }
+
+  public static final boolean KERBEROS_DEFAULT_IMPERSONATION_ENABLE = false;
+
+  public AuthenticationConfig(Properties properties, Configuration 
configuration) {
+    super(false);
+    loadFromHdfsConfiguration(configuration);
+    loadFromMap((Map) properties, k -> true);

Review Comment:
   This raw type cast is unsafe and should be replaced with proper generic 
typing. The cast of Properties (which extends Hashtable<Object,Object>) to Map 
without proper type parameters can lead to ClassCastException at runtime if the 
properties contain non-String keys or values.
   ```suggestion
       // Convert Properties to Map<String, String> to ensure type safety
       Map<String, String> stringMap = new java.util.HashMap<>();
       for (String name : properties.stringPropertyNames()) {
         stringMap.put(name, properties.getProperty(name));
       }
       loadFromMap(stringMap, k -> true);
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.kerberos;
+
+import static 
org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_TOKEN_SIGNATURE;
+import static org.apache.gravitino.hive.kerberos.KerberosConfig.PRINCIPAL_KEY;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.hive.client.HiveClient;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KerberosClient implements java.io.Closeable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(KerberosClient.class);
+
+  private volatile ScheduledThreadPoolExecutor checkTgtExecutor;
+  private final Properties conf;
+  private final Configuration hadoopConf;
+  private final boolean refreshCredentials;
+  private volatile UserGroupInformation realLoginUgi;
+  private final String keytabFilePath;
+  private volatile HiveClient hiveClient = null;
+
+  public KerberosClient(
+      Properties properties,
+      Configuration hadoopConf,
+      boolean refreshCredentials,
+      String keytabFilePath)
+      throws IOException {
+    this.conf = properties;
+    this.hadoopConf = hadoopConf;
+    this.refreshCredentials = refreshCredentials;
+    File keyTabFile = saveKeyTabFileFromUri(keytabFilePath);
+    this.keytabFilePath = keyTabFile.getAbsolutePath();
+  }
+
+  public UserGroupInformation loginProxyUser(String currentUser) {
+    try {
+      if (currentUser.equals(realLoginUgi.getUserName()) || hiveClient == 
null) {
+        return realLoginUgi;
+      }
+
+      String tokenSignature = conf.getProperty(HIVE_METASTORE_TOKEN_SIGNATURE, 
"");
+      String principal = conf.getProperty(PRINCIPAL_KEY, "");
+      @SuppressWarnings("null")
+      List<String> principalComponents = 
Splitter.on('@').splitToList(principal);
+      Preconditions.checkArgument(
+          principalComponents.size() == 2, "The principal has the wrong 
format");
+      String kerberosRealm = principalComponents.get(1);
+
+      UserGroupInformation proxyUser;
+      final String finalPrincipalName;
+      if (!currentUser.contains("@")) {
+        finalPrincipalName = String.format("%s@%s", currentUser, 
kerberosRealm);
+      } else {
+        finalPrincipalName = currentUser;
+      }
+
+      proxyUser = UserGroupInformation.createProxyUser(finalPrincipalName, 
realLoginUgi);
+
+      // Acquire HMS delegation token for the proxy user and attach it to UGI
+      Preconditions.checkArgument(hiveClient != null, "HiveClient is not set 
in KerberosClient");
+      String tokenStr =
+          hiveClient.getDelegationToken(finalPrincipalName, 
realLoginUgi.getUserName());
+
+      Token<DelegationTokenIdentifier> delegationToken = new Token<>();
+      delegationToken.decodeFromUrlString(tokenStr);
+      delegationToken.setService(new Text(tokenSignature));
+      proxyUser.addToken(delegationToken);
+
+      return proxyUser;
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create proxy user for Kerberos 
Hive client", e);
+    }
+  }
+
+  public UserGroupInformation login() throws Exception {
+    KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf);
+
+    // Check the principal and keytab file
+    String catalogPrincipal = kerberosConfig.getPrincipalName();
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(catalogPrincipal), "The principal can't be 
blank");
+    List<String> principalComponents = 
Splitter.on('@').splitToList(catalogPrincipal);
+    Preconditions.checkArgument(
+        principalComponents.size() == 2, "The principal has the wrong format");
+
+    // Login
+    UserGroupInformation.setConfiguration(hadoopConf);
+    UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath);
+    UserGroupInformation loginUgi = UserGroupInformation.getLoginUser();
+    realLoginUgi = loginUgi;
+
+    // Refresh the cache if it's out of date.
+    if (refreshCredentials) {
+      if (checkTgtExecutor == null) {
+        checkTgtExecutor = new ScheduledThreadPoolExecutor(1, 
getThreadFactory("check-tgt"));
+      }
+      int checkInterval = kerberosConfig.getCheckIntervalSec();
+      checkTgtExecutor.scheduleAtFixedRate(
+          () -> {
+            try {
+              loginUgi.checkTGTAndReloginFromKeytab();
+            } catch (Exception e) {
+              LOG.error("Fail to refresh ugi token: ", e);
+            }
+          },
+          checkInterval,
+          checkInterval,
+          TimeUnit.SECONDS);
+    }
+
+    return loginUgi;
+  }
+
+  public File saveKeyTabFileFromUri(String path) throws IOException {
+    KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf);
+
+    String keyTabUri = kerberosConfig.getKeytab();
+    Preconditions.checkArgument(StringUtils.isNotBlank(keyTabUri), "Keytab uri 
can't be blank");
+    Preconditions.checkArgument(
+        !keyTabUri.trim().startsWith("hdfs"), "Keytab uri doesn't support to 
use HDFS");
+
+    File keytabsDir = new File("keytabs");
+    if (!keytabsDir.exists()) {
+      keytabsDir.mkdir();
+    }
+    File keytabFile = new File(path);
+    keytabFile.deleteOnExit();
+    if (keytabFile.exists() && !keytabFile.delete()) {
+      throw new IllegalStateException(
+          String.format("Fail to delete keytab file %s", 
keytabFile.getAbsolutePath()));
+    }
+    int fetchKeytabFileTimeout = kerberosConfig.getFetchTimeoutSec();
+    FetchFileUtils.fetchFileFromUri(keyTabUri, keytabFile, 
fetchKeytabFileTimeout, hadoopConf);
+    return keytabFile;
+  }
+
+  private static ThreadFactory getThreadFactory(String factoryName) {
+    return new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat(factoryName + 
"-%d").build();
+  }
+
+  @Override
+  public void close() {
+    if (checkTgtExecutor != null) {
+      checkTgtExecutor.shutdown();
+    }

Review Comment:
   The close() method does not clean up the keytab file that was created in 
saveKeyTabFileFromUri(). This can lead to keytab files accumulating in the 
keytabs directory over time. The keytabFilePath field should be tracked and the 
file should be explicitly deleted during cleanup.
   ```suggestion
       }
       if (keytabFilePath != null) {
         File keytabFile = new File(keytabFilePath);
         if (keytabFile.exists() && !keytabFile.delete()) {
           LOG.warn("Failed to delete keytab file: {}", keytabFilePath);
         }
       }
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/ProxyHiveClientImpl.java:
##########
@@ -18,12 +18,80 @@
  */
 package org.apache.gravitino.hive.client;
 
+import java.io.IOException;
 import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Properties;
+import org.apache.hadoop.security.UserGroupInformation;
 
 public class ProxyHiveClientImpl implements InvocationHandler {
+
+  private final HiveClient delegate;
+  private final UserGroupInformation ugi;
+
+  private ProxyHiveClientImpl(HiveClient delegate, UserGroupInformation ugi) {
+    this.delegate = delegate;
+    this.ugi = ugi;
+  }
+
+  /**
+   * Wraps a {@link HiveClient} so that all its methods are executed via {@link
+   * UserGroupInformation#doAs(PrivilegedExceptionAction)} of the current user.
+   *
+   * <p>Callers should ensure Kerberos has been configured and the login user 
is set appropriately
+   * (for example via keytab) before calling this method.
+   */
+  public static HiveClient createClient(
+      HiveClientClassLoader.HiveVersion version, UserGroupInformation ugi, 
Properties properties) {
+    try {
+      HiveClient client =
+          ugi.doAs(
+              (PrivilegedExceptionAction<HiveClient>)
+                  () ->
+                      HiveClientFactory.createHiveClientImpl(
+                          version, properties, 
Thread.currentThread().getContextClassLoader()));
+      return (HiveClient)
+          Proxy.newProxyInstance(
+              HiveClient.class.getClassLoader(),
+              new Class<?>[] {HiveClient.class},
+              new ProxyHiveClientImpl(client, ugi));
+
+    } catch (IOException | InterruptedException ex) {
+      throw new RuntimeException("Failed to create Kerberos Hive client", ex);
+    }
+  }
+
   @Override
-  public Object invoke(Object o, Method method, Object[] objects) throws 
Throwable {
-    return null;
+  public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
+    try {
+      return ugi.doAs(
+          (PrivilegedExceptionAction<Object>)
+              () -> {
+                try {
+                  return method.invoke(delegate, args);
+                } catch (InvocationTargetException ite) {
+                  Throwable cause = ite.getCause();
+                  if (cause instanceof Exception) {
+                    throw (Exception) cause;
+                  }
+                  throw new RuntimeException(cause != null ? cause : ite);
+                }

Review Comment:
   The exception handling here catches InvocationTargetException and unwraps 
the cause, but doesn't handle the case where the cause might be an Error (as 
opposed to Exception). If the delegate method throws an Error, it will be 
wrapped in RuntimeException at line 81, which masks critical errors like 
OutOfMemoryError or StackOverflowError that should typically be propagated 
as-is. Consider adding a check to rethrow Errors directly.



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosConfig.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.kerberos;
+
+import java.util.Map;
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.config.ConfigBuilder;
+import org.apache.gravitino.config.ConfigConstants;
+import org.apache.gravitino.config.ConfigEntry;
+import org.apache.hadoop.conf.Configuration;
+
+public class KerberosConfig extends AuthenticationConfig {
+  public static final String KEY_TAB_URI_KEY = 
"authentication.kerberos.keytab-uri";
+
+  public static final String PRINCIPAL_KEY = 
"authentication.kerberos.principal";
+
+  public static final String CHECK_INTERVAL_SEC_KEY = 
"authentication.kerberos.check-interval-sec";
+
+  public static final String FETCH_TIMEOUT_SEC_KEY =
+      "authentication.kerberos.keytab-fetch-timeout-sec";
+
+  public static final ConfigEntry<String> PRINCIPAL_ENTRY =
+      new ConfigBuilder(PRINCIPAL_KEY)
+          .doc("The principal of the Kerberos connection")
+          .version(ConfigConstants.VERSION_1_0_0)
+          .stringConf()
+          .checkValue(StringUtils::isNotBlank, 
ConfigConstants.NOT_BLANK_ERROR_MSG)
+          .create();
+
+  public static final ConfigEntry<String> KEYTAB_ENTRY =
+      new ConfigBuilder(KEY_TAB_URI_KEY)
+          .doc("The keytab of the Kerberos connection")
+          .version(ConfigConstants.VERSION_1_0_0)
+          .stringConf()
+          .checkValue(StringUtils::isNotBlank, 
ConfigConstants.NOT_BLANK_ERROR_MSG)
+          .create();
+
+  public static final ConfigEntry<Integer> CHECK_INTERVAL_SEC_ENTRY =
+      new ConfigBuilder(CHECK_INTERVAL_SEC_KEY)
+          .doc("The check interval of the Kerberos connection for Hudi 
catalog")
+          .version(ConfigConstants.VERSION_1_0_0)
+          .intConf()
+          .checkValue(value -> value > 0, 
ConfigConstants.POSITIVE_NUMBER_ERROR_MSG)
+          .createWithDefault(60);
+
+  public static final ConfigEntry<Integer> FETCH_TIMEOUT_SEC_ENTRY =
+      new ConfigBuilder(FETCH_TIMEOUT_SEC_KEY)
+          .doc("The fetch timeout of the Kerberos connection")
+          .version(ConfigConstants.VERSION_1_0_0)
+          .intConf()
+          .checkValue(value -> value > 0, 
ConfigConstants.POSITIVE_NUMBER_ERROR_MSG)
+          .createWithDefault(60);
+
+  public KerberosConfig(Properties properties, Configuration configuration) {
+    super(properties, configuration);
+    loadFromMap((Map) properties, k -> true);

Review Comment:
   This raw type cast is unsafe and should be replaced with proper generic 
typing. The cast of Properties (which extends Hashtable<Object,Object>) to Map 
without proper type parameters can lead to ClassCastException at runtime if the 
properties contain non-String keys or values.
   ```suggestion
       Map<String, String> stringMap = new java.util.HashMap<>();
       for (String name : properties.stringPropertyNames()) {
         stringMap.put(name, properties.getProperty(name));
       }
       loadFromMap(stringMap, k -> true);
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientFactory.java:
##########
@@ -56,10 +64,18 @@ public final class HiveClientFactory {
   public HiveClientFactory(Properties properties, String id) {
     Preconditions.checkArgument(properties != null, "Properties cannot be 
null");
     this.properties = properties;
+    this.keytabPath = String.format("keytabs/gravitino-%s-keytab", id);
 
     try {
       this.hadoopConf = new Configuration();
       updateConfigurationFromProperties(properties, hadoopConf);
+
+      initKerberosIfNecessary();
+      if (enableKerberos) {
+        // set hive client to kerberos client for retrieving delegation token
+        HiveClient client = createHiveClient();
+        kerberosClient.setHiveClient(client);
+      }

Review Comment:
   There's a potential circular dependency issue during initialization. The 
constructor calls initKerberosIfNecessary() which calls createHiveClient() to 
set up the kerberosClient.hiveClient. However, createHiveClient() can 
potentially try to use the kerberosClient (if enableImpersonation is true) 
before it's fully initialized. This could lead to null pointer exceptions or 
incomplete initialization. The initialization sequence should be restructured 
to avoid this circular dependency.
   ```suggestion
         // Removed setting HiveClient on KerberosClient here to avoid circular 
dependency.
         // The HiveClient should be set on the KerberosClient after full 
initialization.
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.kerberos;
+
+import static 
org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_TOKEN_SIGNATURE;
+import static org.apache.gravitino.hive.kerberos.KerberosConfig.PRINCIPAL_KEY;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.hive.client.HiveClient;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KerberosClient implements java.io.Closeable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(KerberosClient.class);
+
+  private volatile ScheduledThreadPoolExecutor checkTgtExecutor;
+  private final Properties conf;
+  private final Configuration hadoopConf;
+  private final boolean refreshCredentials;
+  private volatile UserGroupInformation realLoginUgi;
+  private final String keytabFilePath;
+  private volatile HiveClient hiveClient = null;
+
+  public KerberosClient(
+      Properties properties,
+      Configuration hadoopConf,
+      boolean refreshCredentials,
+      String keytabFilePath)
+      throws IOException {
+    this.conf = properties;
+    this.hadoopConf = hadoopConf;
+    this.refreshCredentials = refreshCredentials;
+    File keyTabFile = saveKeyTabFileFromUri(keytabFilePath);
+    this.keytabFilePath = keyTabFile.getAbsolutePath();
+  }
+
+  public UserGroupInformation loginProxyUser(String currentUser) {
+    try {
+      if (currentUser.equals(realLoginUgi.getUserName()) || hiveClient == 
null) {
+        return realLoginUgi;
+      }
+
+      String tokenSignature = conf.getProperty(HIVE_METASTORE_TOKEN_SIGNATURE, 
"");
+      String principal = conf.getProperty(PRINCIPAL_KEY, "");
+      @SuppressWarnings("null")
+      List<String> principalComponents = 
Splitter.on('@').splitToList(principal);
+      Preconditions.checkArgument(
+          principalComponents.size() == 2, "The principal has the wrong 
format");
+      String kerberosRealm = principalComponents.get(1);
+
+      UserGroupInformation proxyUser;
+      final String finalPrincipalName;
+      if (!currentUser.contains("@")) {
+        finalPrincipalName = String.format("%s@%s", currentUser, 
kerberosRealm);
+      } else {
+        finalPrincipalName = currentUser;
+      }
+
+      proxyUser = UserGroupInformation.createProxyUser(finalPrincipalName, 
realLoginUgi);
+
+      // Acquire HMS delegation token for the proxy user and attach it to UGI
+      Preconditions.checkArgument(hiveClient != null, "HiveClient is not set 
in KerberosClient");
+      String tokenStr =
+          hiveClient.getDelegationToken(finalPrincipalName, 
realLoginUgi.getUserName());
+
+      Token<DelegationTokenIdentifier> delegationToken = new Token<>();
+      delegationToken.decodeFromUrlString(tokenStr);
+      delegationToken.setService(new Text(tokenSignature));
+      proxyUser.addToken(delegationToken);
+
+      return proxyUser;
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create proxy user for Kerberos 
Hive client", e);
+    }
+  }
+
+  public UserGroupInformation login() throws Exception {
+    KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf);
+
+    // Check the principal and keytab file
+    String catalogPrincipal = kerberosConfig.getPrincipalName();
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(catalogPrincipal), "The principal can't be 
blank");
+    List<String> principalComponents = 
Splitter.on('@').splitToList(catalogPrincipal);
+    Preconditions.checkArgument(
+        principalComponents.size() == 2, "The principal has the wrong format");
+
+    // Login
+    UserGroupInformation.setConfiguration(hadoopConf);
+    UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath);
+    UserGroupInformation loginUgi = UserGroupInformation.getLoginUser();
+    realLoginUgi = loginUgi;

Review Comment:
   The realLoginUgi field is declared as volatile but is accessed and modified 
without synchronization in both login() (line 124) and loginProxyUser() (line 
71-72). While volatile ensures visibility, it doesn't provide atomicity for 
compound operations. The loginProxyUser() method reads realLoginUgi multiple 
times, and if login() is called concurrently from another thread, it could see 
an inconsistent state. Consider using synchronization or ensure these methods 
are never called concurrently.



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.kerberos;
+
+import static 
org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_TOKEN_SIGNATURE;
+import static org.apache.gravitino.hive.kerberos.KerberosConfig.PRINCIPAL_KEY;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.hive.client.HiveClient;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KerberosClient implements java.io.Closeable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(KerberosClient.class);
+
+  private volatile ScheduledThreadPoolExecutor checkTgtExecutor;
+  private final Properties conf;
+  private final Configuration hadoopConf;
+  private final boolean refreshCredentials;
+  private volatile UserGroupInformation realLoginUgi;
+  private final String keytabFilePath;
+  private volatile HiveClient hiveClient = null;
+
+  public KerberosClient(
+      Properties properties,
+      Configuration hadoopConf,
+      boolean refreshCredentials,
+      String keytabFilePath)
+      throws IOException {
+    this.conf = properties;
+    this.hadoopConf = hadoopConf;
+    this.refreshCredentials = refreshCredentials;
+    File keyTabFile = saveKeyTabFileFromUri(keytabFilePath);
+    this.keytabFilePath = keyTabFile.getAbsolutePath();
+  }
+
+  public UserGroupInformation loginProxyUser(String currentUser) {
+    try {
+      if (currentUser.equals(realLoginUgi.getUserName()) || hiveClient == 
null) {
+        return realLoginUgi;
+      }
+
+      String tokenSignature = conf.getProperty(HIVE_METASTORE_TOKEN_SIGNATURE, 
"");
+      String principal = conf.getProperty(PRINCIPAL_KEY, "");
+      @SuppressWarnings("null")
+      List<String> principalComponents = 
Splitter.on('@').splitToList(principal);
+      Preconditions.checkArgument(
+          principalComponents.size() == 2, "The principal has the wrong 
format");
+      String kerberosRealm = principalComponents.get(1);
+
+      UserGroupInformation proxyUser;
+      final String finalPrincipalName;
+      if (!currentUser.contains("@")) {
+        finalPrincipalName = String.format("%s@%s", currentUser, 
kerberosRealm);
+      } else {
+        finalPrincipalName = currentUser;
+      }
+
+      proxyUser = UserGroupInformation.createProxyUser(finalPrincipalName, 
realLoginUgi);
+
+      // Acquire HMS delegation token for the proxy user and attach it to UGI
+      Preconditions.checkArgument(hiveClient != null, "HiveClient is not set 
in KerberosClient");
+      String tokenStr =
+          hiveClient.getDelegationToken(finalPrincipalName, 
realLoginUgi.getUserName());
+
+      Token<DelegationTokenIdentifier> delegationToken = new Token<>();
+      delegationToken.decodeFromUrlString(tokenStr);
+      delegationToken.setService(new Text(tokenSignature));
+      proxyUser.addToken(delegationToken);
+
+      return proxyUser;
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create proxy user for Kerberos 
Hive client", e);
+    }
+  }
+
+  public UserGroupInformation login() throws Exception {
+    KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf);
+
+    // Check the principal and keytab file
+    String catalogPrincipal = kerberosConfig.getPrincipalName();
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(catalogPrincipal), "The principal can't be 
blank");
+    List<String> principalComponents = 
Splitter.on('@').splitToList(catalogPrincipal);
+    Preconditions.checkArgument(
+        principalComponents.size() == 2, "The principal has the wrong format");
+
+    // Login
+    UserGroupInformation.setConfiguration(hadoopConf);
+    UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath);
+    UserGroupInformation loginUgi = UserGroupInformation.getLoginUser();
+    realLoginUgi = loginUgi;
+
+    // Refresh the cache if it's out of date.
+    if (refreshCredentials) {
+      if (checkTgtExecutor == null) {
+        checkTgtExecutor = new ScheduledThreadPoolExecutor(1, 
getThreadFactory("check-tgt"));
+      }
+      int checkInterval = kerberosConfig.getCheckIntervalSec();
+      checkTgtExecutor.scheduleAtFixedRate(
+          () -> {
+            try {
+              loginUgi.checkTGTAndReloginFromKeytab();
+            } catch (Exception e) {
+              LOG.error("Fail to refresh ugi token: ", e);
+            }
+          },
+          checkInterval,
+          checkInterval,
+          TimeUnit.SECONDS);
+    }
+
+    return loginUgi;
+  }
+
+  public File saveKeyTabFileFromUri(String path) throws IOException {
+    KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf);
+
+    String keyTabUri = kerberosConfig.getKeytab();
+    Preconditions.checkArgument(StringUtils.isNotBlank(keyTabUri), "Keytab uri 
can't be blank");
+    Preconditions.checkArgument(
+        !keyTabUri.trim().startsWith("hdfs"), "Keytab uri doesn't support to 
use HDFS");
+
+    File keytabsDir = new File("keytabs");
+    if (!keytabsDir.exists()) {
+      keytabsDir.mkdir();
+    }
+    File keytabFile = new File(path);
+    keytabFile.deleteOnExit();
+    if (keytabFile.exists() && !keytabFile.delete()) {
+      throw new IllegalStateException(
+          String.format("Fail to delete keytab file %s", 
keytabFile.getAbsolutePath()));
+    }
+    int fetchKeytabFileTimeout = kerberosConfig.getFetchTimeoutSec();
+    FetchFileUtils.fetchFileFromUri(keyTabUri, keytabFile, 
fetchKeytabFileTimeout, hadoopConf);
+    return keytabFile;

Review Comment:
   The keytab file should be created with restricted permissions (e.g., 0600) 
to prevent unauthorized access to sensitive Kerberos credentials. Currently, 
the file is created with default permissions which may be too permissive. Use 
Files.setPosixFilePermissions() or similar to restrict access to the owner only.



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/AuthenticationConfig.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.kerberos;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+
+import java.util.Map;
+import java.util.Properties;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.config.ConfigBuilder;
+import org.apache.gravitino.config.ConfigConstants;
+import org.apache.gravitino.config.ConfigEntry;
+import org.apache.hadoop.conf.Configuration;
+
+public class AuthenticationConfig extends Config {
+
+  // The key for the authentication type, currently we support Kerberos and 
simple
+  public static final String AUTH_TYPE_KEY = "authentication.type";
+
+  public static final String IMPERSONATION_ENABLE_KEY = 
"authentication.impersonation-enable";
+
+  enum AuthenticationType {
+    SIMPLE,
+    KERBEROS;
+  }
+
+  public static final boolean KERBEROS_DEFAULT_IMPERSONATION_ENABLE = false;
+
+  public AuthenticationConfig(Properties properties, Configuration 
configuration) {
+    super(false);
+    loadFromHdfsConfiguration(configuration);
+    loadFromMap((Map) properties, k -> true);
+  }
+
+  private void loadFromHdfsConfiguration(Configuration configuration) {
+    String authType = configuration.get(HADOOP_SECURITY_AUTHENTICATION, 
"simple");
+    configMap.put(AUTH_TYPE_KEY, authType);
+  }
+
+  public static final ConfigEntry<String> AUTH_TYPE_ENTRY =
+      new ConfigBuilder(AUTH_TYPE_KEY)
+          .doc(
+              "The type of authentication for Hudi catalog, currently we only 
support simple and Kerberos")
+          .version(ConfigConstants.VERSION_1_0_0)
+          .stringConf()
+          .createWithDefault("simple");
+
+  public static final ConfigEntry<Boolean> ENABLE_IMPERSONATION_ENTRY =
+      new ConfigBuilder(IMPERSONATION_ENABLE_KEY)
+          .doc("Whether to enable impersonation for the Hudi catalog")

Review Comment:
   The documentation mentions "Hudi catalog" but this configuration is for the 
Hive catalog. This appears to be a copy-paste error from another catalog 
implementation.
   ```suggestion
                 "The type of authentication for Hive catalog, currently we 
only support simple and Kerberos")
             .version(ConfigConstants.VERSION_1_0_0)
             .stringConf()
             .createWithDefault("simple");
   
     public static final ConfigEntry<Boolean> ENABLE_IMPERSONATION_ENTRY =
         new ConfigBuilder(IMPERSONATION_ENABLE_KEY)
             .doc("Whether to enable impersonation for the Hive catalog")
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/AuthenticationConfig.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.kerberos;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+
+import java.util.Map;
+import java.util.Properties;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.config.ConfigBuilder;
+import org.apache.gravitino.config.ConfigConstants;
+import org.apache.gravitino.config.ConfigEntry;
+import org.apache.hadoop.conf.Configuration;
+
+public class AuthenticationConfig extends Config {
+
+  // The key for the authentication type, currently we support Kerberos and 
simple
+  public static final String AUTH_TYPE_KEY = "authentication.type";
+
+  public static final String IMPERSONATION_ENABLE_KEY = 
"authentication.impersonation-enable";
+
+  enum AuthenticationType {
+    SIMPLE,
+    KERBEROS;
+  }
+
+  public static final boolean KERBEROS_DEFAULT_IMPERSONATION_ENABLE = false;
+
+  public AuthenticationConfig(Properties properties, Configuration 
configuration) {
+    super(false);
+    loadFromHdfsConfiguration(configuration);
+    loadFromMap((Map) properties, k -> true);
+  }
+
+  private void loadFromHdfsConfiguration(Configuration configuration) {
+    String authType = configuration.get(HADOOP_SECURITY_AUTHENTICATION, 
"simple");
+    configMap.put(AUTH_TYPE_KEY, authType);
+  }
+
+  public static final ConfigEntry<String> AUTH_TYPE_ENTRY =
+      new ConfigBuilder(AUTH_TYPE_KEY)
+          .doc(
+              "The type of authentication for Hudi catalog, currently we only 
support simple and Kerberos")
+          .version(ConfigConstants.VERSION_1_0_0)
+          .stringConf()
+          .createWithDefault("simple");
+
+  public static final ConfigEntry<Boolean> ENABLE_IMPERSONATION_ENTRY =
+      new ConfigBuilder(IMPERSONATION_ENABLE_KEY)
+          .doc("Whether to enable impersonation for the Hudi catalog")

Review Comment:
   The documentation mentions "Hudi catalog" but this configuration is for the 
Hive catalog. This appears to be a copy-paste error from another catalog 
implementation.
   ```suggestion
             .doc("Whether to enable impersonation for the Hive catalog")
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.kerberos;
+
+import static 
org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_TOKEN_SIGNATURE;
+import static org.apache.gravitino.hive.kerberos.KerberosConfig.PRINCIPAL_KEY;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.hive.client.HiveClient;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KerberosClient implements java.io.Closeable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(KerberosClient.class);
+
+  private volatile ScheduledThreadPoolExecutor checkTgtExecutor;
+  private final Properties conf;
+  private final Configuration hadoopConf;
+  private final boolean refreshCredentials;
+  private volatile UserGroupInformation realLoginUgi;
+  private final String keytabFilePath;
+  private volatile HiveClient hiveClient = null;
+
+  public KerberosClient(
+      Properties properties,
+      Configuration hadoopConf,
+      boolean refreshCredentials,
+      String keytabFilePath)
+      throws IOException {
+    this.conf = properties;
+    this.hadoopConf = hadoopConf;
+    this.refreshCredentials = refreshCredentials;
+    File keyTabFile = saveKeyTabFileFromUri(keytabFilePath);
+    this.keytabFilePath = keyTabFile.getAbsolutePath();
+  }
+
+  public UserGroupInformation loginProxyUser(String currentUser) {
+    try {
+      if (currentUser.equals(realLoginUgi.getUserName()) || hiveClient == 
null) {
+        return realLoginUgi;
+      }
+
+      String tokenSignature = conf.getProperty(HIVE_METASTORE_TOKEN_SIGNATURE, 
"");
+      String principal = conf.getProperty(PRINCIPAL_KEY, "");
+      @SuppressWarnings("null")
+      List<String> principalComponents = 
Splitter.on('@').splitToList(principal);
+      Preconditions.checkArgument(
+          principalComponents.size() == 2, "The principal has the wrong 
format");
+      String kerberosRealm = principalComponents.get(1);
+
+      UserGroupInformation proxyUser;
+      final String finalPrincipalName;
+      if (!currentUser.contains("@")) {
+        finalPrincipalName = String.format("%s@%s", currentUser, 
kerberosRealm);
+      } else {
+        finalPrincipalName = currentUser;
+      }
+
+      proxyUser = UserGroupInformation.createProxyUser(finalPrincipalName, 
realLoginUgi);
+
+      // Acquire HMS delegation token for the proxy user and attach it to UGI
+      Preconditions.checkArgument(hiveClient != null, "HiveClient is not set 
in KerberosClient");
+      String tokenStr =
+          hiveClient.getDelegationToken(finalPrincipalName, 
realLoginUgi.getUserName());
+
+      Token<DelegationTokenIdentifier> delegationToken = new Token<>();
+      delegationToken.decodeFromUrlString(tokenStr);
+      delegationToken.setService(new Text(tokenSignature));
+      proxyUser.addToken(delegationToken);
+
+      return proxyUser;
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create proxy user for Kerberos 
Hive client", e);
+    }
+  }
+
+  public UserGroupInformation login() throws Exception {
+    KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf);
+
+    // Check the principal and keytab file
+    String catalogPrincipal = kerberosConfig.getPrincipalName();
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(catalogPrincipal), "The principal can't be 
blank");
+    List<String> principalComponents = 
Splitter.on('@').splitToList(catalogPrincipal);
+    Preconditions.checkArgument(
+        principalComponents.size() == 2, "The principal has the wrong format");
+
+    // Login
+    UserGroupInformation.setConfiguration(hadoopConf);
+    UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath);
+    UserGroupInformation loginUgi = UserGroupInformation.getLoginUser();
+    realLoginUgi = loginUgi;
+
+    // Refresh the cache if it's out of date.
+    if (refreshCredentials) {
+      if (checkTgtExecutor == null) {
+        checkTgtExecutor = new ScheduledThreadPoolExecutor(1, 
getThreadFactory("check-tgt"));
+      }
+      int checkInterval = kerberosConfig.getCheckIntervalSec();
+      checkTgtExecutor.scheduleAtFixedRate(
+          () -> {
+            try {
+              loginUgi.checkTGTAndReloginFromKeytab();
+            } catch (Exception e) {
+              LOG.error("Fail to refresh ugi token: ", e);
+            }
+          },
+          checkInterval,
+          checkInterval,
+          TimeUnit.SECONDS);
+    }
+
+    return loginUgi;
+  }
+
+  public File saveKeyTabFileFromUri(String path) throws IOException {
+    KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf);
+
+    String keyTabUri = kerberosConfig.getKeytab();
+    Preconditions.checkArgument(StringUtils.isNotBlank(keyTabUri), "Keytab uri 
can't be blank");
+    Preconditions.checkArgument(
+        !keyTabUri.trim().startsWith("hdfs"), "Keytab uri doesn't support to 
use HDFS");
+
+    File keytabsDir = new File("keytabs");
+    if (!keytabsDir.exists()) {
+      keytabsDir.mkdir();
+    }
+    File keytabFile = new File(path);
+    keytabFile.deleteOnExit();
+    if (keytabFile.exists() && !keytabFile.delete()) {
+      throw new IllegalStateException(
+          String.format("Fail to delete keytab file %s", 
keytabFile.getAbsolutePath()));
+    }
+    int fetchKeytabFileTimeout = kerberosConfig.getFetchTimeoutSec();
+    FetchFileUtils.fetchFileFromUri(keyTabUri, keytabFile, 
fetchKeytabFileTimeout, hadoopConf);
+    return keytabFile;
+  }

Review Comment:
   The public methods loginProxyUser(), login(), and saveKeyTabFileFromUri() 
lack Javadoc documentation. These are critical security-related methods that 
should have comprehensive documentation describing their purpose, parameters, 
return values, exceptions, and any thread-safety guarantees or usage 
constraints.



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.kerberos;
+
+import static 
org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_TOKEN_SIGNATURE;
+import static org.apache.gravitino.hive.kerberos.KerberosConfig.PRINCIPAL_KEY;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.hive.client.HiveClient;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KerberosClient implements java.io.Closeable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(KerberosClient.class);
+
+  private volatile ScheduledThreadPoolExecutor checkTgtExecutor;
+  private final Properties conf;
+  private final Configuration hadoopConf;
+  private final boolean refreshCredentials;
+  private volatile UserGroupInformation realLoginUgi;
+  private final String keytabFilePath;
+  private volatile HiveClient hiveClient = null;
+
+  public KerberosClient(
+      Properties properties,
+      Configuration hadoopConf,
+      boolean refreshCredentials,
+      String keytabFilePath)
+      throws IOException {
+    this.conf = properties;
+    this.hadoopConf = hadoopConf;
+    this.refreshCredentials = refreshCredentials;
+    File keyTabFile = saveKeyTabFileFromUri(keytabFilePath);
+    this.keytabFilePath = keyTabFile.getAbsolutePath();
+  }
+
+  public UserGroupInformation loginProxyUser(String currentUser) {
+    try {
+      if (currentUser.equals(realLoginUgi.getUserName()) || hiveClient == 
null) {
+        return realLoginUgi;
+      }
+
+      String tokenSignature = conf.getProperty(HIVE_METASTORE_TOKEN_SIGNATURE, 
"");
+      String principal = conf.getProperty(PRINCIPAL_KEY, "");
+      @SuppressWarnings("null")

Review Comment:
   The @SuppressWarnings("null") annotation is misleading here. The 
Splitter.on('@').splitToList() method is guaranteed to never return null - it 
always returns a non-null list (though it might be empty or have fewer elements 
than expected). The real issue being suppressed is likely a different warning. 
This annotation should be removed or clarified with the actual warning being 
suppressed.
   ```suggestion
   
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/FetchFileUtils.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hive.kerberos;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class FetchFileUtils {
+
+  private FetchFileUtils() {}
+
+  public static void fetchFileFromUri(
+      String fileUri, File destFile, int timeout, Configuration conf) throws 
java.io.IOException {
+    try {
+      URI uri = new URI(fileUri);
+      String scheme = 
java.util.Optional.ofNullable(uri.getScheme()).orElse("file");
+
+      switch (scheme) {
+        case "http":
+        case "https":
+        case "ftp":
+          FileUtils.copyURLToFile(uri.toURL(), destFile, timeout * 1000, 
timeout * 1000);
+          break;
+
+        case "file":
+          Files.createSymbolicLink(destFile.toPath(), new 
File(uri.getPath()).toPath());
+          break;
+
+        case "hdfs":

Review Comment:
   There's a potential path traversal vulnerability here. The URI path is used 
directly without validation, which could allow an attacker to access files 
outside the intended directory by using path traversal sequences like "../". 
Input validation should be added to ensure the resolved path stays within 
expected boundaries.
   ```suggestion
           case "hdfs":
             // Prevent path traversal: ensure destFile is within its parent 
directory
             File destCanonical = destFile.getCanonicalFile();
             File parentCanonical = destFile.getParentFile().getCanonicalFile();
             if (!destCanonical.getPath().startsWith(parentCanonical.getPath() 
+ File.separator)) {
               throw new SecurityException("Path traversal attempt detected: 
destination file is outside the allowed directory.");
             }
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.kerberos;
+
+import static 
org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_TOKEN_SIGNATURE;
+import static org.apache.gravitino.hive.kerberos.KerberosConfig.PRINCIPAL_KEY;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.hive.client.HiveClient;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KerberosClient implements java.io.Closeable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(KerberosClient.class);
+
+  private volatile ScheduledThreadPoolExecutor checkTgtExecutor;
+  private final Properties conf;
+  private final Configuration hadoopConf;
+  private final boolean refreshCredentials;
+  private volatile UserGroupInformation realLoginUgi;
+  private final String keytabFilePath;
+  private volatile HiveClient hiveClient = null;
+
+  public KerberosClient(
+      Properties properties,
+      Configuration hadoopConf,
+      boolean refreshCredentials,
+      String keytabFilePath)
+      throws IOException {
+    this.conf = properties;
+    this.hadoopConf = hadoopConf;
+    this.refreshCredentials = refreshCredentials;
+    File keyTabFile = saveKeyTabFileFromUri(keytabFilePath);
+    this.keytabFilePath = keyTabFile.getAbsolutePath();
+  }
+
+  public UserGroupInformation loginProxyUser(String currentUser) {
+    try {
+      if (currentUser.equals(realLoginUgi.getUserName()) || hiveClient == 
null) {
+        return realLoginUgi;
+      }

Review Comment:
   There's a logic error in the condition on line 71. The method returns 
realLoginUgi if either currentUser equals the realLoginUgi username OR if 
hiveClient is null. However, when hiveClient is null, user impersonation cannot 
work at all (as evidenced by the later getDelegationToken call), so this should 
be treated as an error condition rather than silently returning realLoginUgi. 
This could mask configuration or initialization issues.
   ```suggestion
         if (currentUser.equals(realLoginUgi.getUserName())) {
           return realLoginUgi;
         }
         Preconditions.checkState(hiveClient != null, "HiveClient is not set in 
KerberosClient");
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.kerberos;
+
+import static 
org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_TOKEN_SIGNATURE;
+import static org.apache.gravitino.hive.kerberos.KerberosConfig.PRINCIPAL_KEY;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.hive.client.HiveClient;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KerberosClient implements java.io.Closeable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(KerberosClient.class);
+
+  private volatile ScheduledThreadPoolExecutor checkTgtExecutor;
+  private final Properties conf;
+  private final Configuration hadoopConf;
+  private final boolean refreshCredentials;
+  private volatile UserGroupInformation realLoginUgi;
+  private final String keytabFilePath;
+  private volatile HiveClient hiveClient = null;
+
+  public KerberosClient(
+      Properties properties,
+      Configuration hadoopConf,
+      boolean refreshCredentials,
+      String keytabFilePath)
+      throws IOException {
+    this.conf = properties;
+    this.hadoopConf = hadoopConf;
+    this.refreshCredentials = refreshCredentials;
+    File keyTabFile = saveKeyTabFileFromUri(keytabFilePath);
+    this.keytabFilePath = keyTabFile.getAbsolutePath();
+  }
+
+  public UserGroupInformation loginProxyUser(String currentUser) {
+    try {
+      if (currentUser.equals(realLoginUgi.getUserName()) || hiveClient == 
null) {
+        return realLoginUgi;
+      }
+
+      String tokenSignature = conf.getProperty(HIVE_METASTORE_TOKEN_SIGNATURE, 
"");
+      String principal = conf.getProperty(PRINCIPAL_KEY, "");
+      @SuppressWarnings("null")
+      List<String> principalComponents = 
Splitter.on('@').splitToList(principal);
+      Preconditions.checkArgument(
+          principalComponents.size() == 2, "The principal has the wrong 
format");
+      String kerberosRealm = principalComponents.get(1);
+
+      UserGroupInformation proxyUser;
+      final String finalPrincipalName;
+      if (!currentUser.contains("@")) {
+        finalPrincipalName = String.format("%s@%s", currentUser, 
kerberosRealm);
+      } else {
+        finalPrincipalName = currentUser;
+      }
+
+      proxyUser = UserGroupInformation.createProxyUser(finalPrincipalName, 
realLoginUgi);
+
+      // Acquire HMS delegation token for the proxy user and attach it to UGI
+      Preconditions.checkArgument(hiveClient != null, "HiveClient is not set 
in KerberosClient");
+      String tokenStr =
+          hiveClient.getDelegationToken(finalPrincipalName, 
realLoginUgi.getUserName());
+
+      Token<DelegationTokenIdentifier> delegationToken = new Token<>();
+      delegationToken.decodeFromUrlString(tokenStr);
+      delegationToken.setService(new Text(tokenSignature));
+      proxyUser.addToken(delegationToken);
+
+      return proxyUser;
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create proxy user for Kerberos 
Hive client", e);
+    }
+  }
+
+  public UserGroupInformation login() throws Exception {
+    KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf);
+
+    // Check the principal and keytab file
+    String catalogPrincipal = kerberosConfig.getPrincipalName();
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(catalogPrincipal), "The principal can't be 
blank");
+    List<String> principalComponents = 
Splitter.on('@').splitToList(catalogPrincipal);
+    Preconditions.checkArgument(
+        principalComponents.size() == 2, "The principal has the wrong format");
+
+    // Login
+    UserGroupInformation.setConfiguration(hadoopConf);
+    UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath);
+    UserGroupInformation loginUgi = UserGroupInformation.getLoginUser();
+    realLoginUgi = loginUgi;
+
+    // Refresh the cache if it's out of date.
+    if (refreshCredentials) {
+      if (checkTgtExecutor == null) {
+        checkTgtExecutor = new ScheduledThreadPoolExecutor(1, 
getThreadFactory("check-tgt"));
+      }
+      int checkInterval = kerberosConfig.getCheckIntervalSec();
+      checkTgtExecutor.scheduleAtFixedRate(
+          () -> {
+            try {
+              loginUgi.checkTGTAndReloginFromKeytab();
+            } catch (Exception e) {
+              LOG.error("Fail to refresh ugi token: ", e);
+            }
+          },
+          checkInterval,
+          checkInterval,
+          TimeUnit.SECONDS);
+    }
+
+    return loginUgi;
+  }
+
+  public File saveKeyTabFileFromUri(String path) throws IOException {
+    KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf);
+
+    String keyTabUri = kerberosConfig.getKeytab();
+    Preconditions.checkArgument(StringUtils.isNotBlank(keyTabUri), "Keytab uri 
can't be blank");
+    Preconditions.checkArgument(
+        !keyTabUri.trim().startsWith("hdfs"), "Keytab uri doesn't support to 
use HDFS");
+
+    File keytabsDir = new File("keytabs");
+    if (!keytabsDir.exists()) {
+      keytabsDir.mkdir();
+    }
+    File keytabFile = new File(path);
+    keytabFile.deleteOnExit();

Review Comment:
   The method creates a keytab file but marks it for deletion on JVM exit with 
deleteOnExit(). This is problematic because: 1) The file needs to persist for 
the lifetime of the Kerberos client to support token refresh operations, 2) 
deleteOnExit() can cause memory leaks in long-running applications as the file 
paths accumulate in memory, and 3) The file may not be deleted if the JVM 
crashes. Consider implementing explicit cleanup in the close() method instead.



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosConfig.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.kerberos;
+
+import java.util.Map;
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.config.ConfigBuilder;
+import org.apache.gravitino.config.ConfigConstants;
+import org.apache.gravitino.config.ConfigEntry;
+import org.apache.hadoop.conf.Configuration;
+
+public class KerberosConfig extends AuthenticationConfig {
+  public static final String KEY_TAB_URI_KEY = 
"authentication.kerberos.keytab-uri";

Review Comment:
   The AuthenticationConfig and KerberosConfig classes are missing class-level 
Javadoc documentation. As public configuration classes that define the contract 
for Kerberos authentication settings, they should have comprehensive 
documentation explaining their purpose, usage, and relationship to other 
components.
   ```suggestion
   /**
    * Configuration class for Kerberos authentication settings.
    * <p>
    * This class defines the contract for configuring Kerberos authentication 
in Hive Metastore integrations.
    * It provides configuration keys and accessors for essential Kerberos 
parameters such as principal,
    * keytab location, check interval, and fetch timeout. These settings are 
used to establish and maintain
    * secure connections to services protected by Kerberos.
    * <p>
    * {@code KerberosConfig} extends {@link AuthenticationConfig}, inheriting 
general authentication
    * configuration capabilities and specializing them for Kerberos-specific 
requirements.
    * <p>
    * Typical usage involves constructing an instance with the relevant {@link 
java.util.Properties}
    * and Hadoop {@link org.apache.hadoop.conf.Configuration}, after which the 
configuration values
    * can be accessed via the provided getter methods.
    *
    * <p>Key configuration entries:
    * <ul>
    *   <li>{@code authentication.kerberos.principal} - The Kerberos principal 
name.</li>
    *   <li>{@code authentication.kerberos.keytab-uri} - The URI to the keytab 
file.</li>
    *   <li>{@code authentication.kerberos.check-interval-sec} - Interval (in 
seconds) to check ticket validity.</li>
    *   <li>{@code authentication.kerberos.keytab-fetch-timeout-sec} - Timeout 
(in seconds) for fetching the keytab.</li>
    * </ul>
    *
    * @see AuthenticationConfig
    */
   public class KerberosConfig extends AuthenticationConfig {
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.kerberos;
+
+import static 
org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_TOKEN_SIGNATURE;
+import static org.apache.gravitino.hive.kerberos.KerberosConfig.PRINCIPAL_KEY;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.hive.client.HiveClient;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KerberosClient implements java.io.Closeable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(KerberosClient.class);
+
+  private volatile ScheduledThreadPoolExecutor checkTgtExecutor;
+  private final Properties conf;
+  private final Configuration hadoopConf;
+  private final boolean refreshCredentials;
+  private volatile UserGroupInformation realLoginUgi;
+  private final String keytabFilePath;
+  private volatile HiveClient hiveClient = null;
+
+  public KerberosClient(
+      Properties properties,
+      Configuration hadoopConf,
+      boolean refreshCredentials,
+      String keytabFilePath)
+      throws IOException {
+    this.conf = properties;
+    this.hadoopConf = hadoopConf;
+    this.refreshCredentials = refreshCredentials;
+    File keyTabFile = saveKeyTabFileFromUri(keytabFilePath);
+    this.keytabFilePath = keyTabFile.getAbsolutePath();
+  }
+
+  public UserGroupInformation loginProxyUser(String currentUser) {
+    try {
+      if (currentUser.equals(realLoginUgi.getUserName()) || hiveClient == 
null) {
+        return realLoginUgi;
+      }
+
+      String tokenSignature = conf.getProperty(HIVE_METASTORE_TOKEN_SIGNATURE, 
"");
+      String principal = conf.getProperty(PRINCIPAL_KEY, "");
+      @SuppressWarnings("null")
+      List<String> principalComponents = 
Splitter.on('@').splitToList(principal);
+      Preconditions.checkArgument(
+          principalComponents.size() == 2, "The principal has the wrong 
format");
+      String kerberosRealm = principalComponents.get(1);
+
+      UserGroupInformation proxyUser;
+      final String finalPrincipalName;
+      if (!currentUser.contains("@")) {
+        finalPrincipalName = String.format("%s@%s", currentUser, 
kerberosRealm);
+      } else {
+        finalPrincipalName = currentUser;
+      }
+
+      proxyUser = UserGroupInformation.createProxyUser(finalPrincipalName, 
realLoginUgi);
+
+      // Acquire HMS delegation token for the proxy user and attach it to UGI
+      Preconditions.checkArgument(hiveClient != null, "HiveClient is not set 
in KerberosClient");
+      String tokenStr =
+          hiveClient.getDelegationToken(finalPrincipalName, 
realLoginUgi.getUserName());
+
+      Token<DelegationTokenIdentifier> delegationToken = new Token<>();
+      delegationToken.decodeFromUrlString(tokenStr);
+      delegationToken.setService(new Text(tokenSignature));
+      proxyUser.addToken(delegationToken);
+
+      return proxyUser;
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create proxy user for Kerberos 
Hive client", e);
+    }
+  }
+
+  public UserGroupInformation login() throws Exception {
+    KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf);
+
+    // Check the principal and keytab file
+    String catalogPrincipal = kerberosConfig.getPrincipalName();
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(catalogPrincipal), "The principal can't be 
blank");
+    List<String> principalComponents = 
Splitter.on('@').splitToList(catalogPrincipal);
+    Preconditions.checkArgument(
+        principalComponents.size() == 2, "The principal has the wrong format");
+
+    // Login
+    UserGroupInformation.setConfiguration(hadoopConf);
+    UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath);
+    UserGroupInformation loginUgi = UserGroupInformation.getLoginUser();
+    realLoginUgi = loginUgi;
+
+    // Refresh the cache if it's out of date.
+    if (refreshCredentials) {
+      if (checkTgtExecutor == null) {
+        checkTgtExecutor = new ScheduledThreadPoolExecutor(1, 
getThreadFactory("check-tgt"));
+      }

Review Comment:
   The checkTgtExecutor is declared as volatile, but its initialization in the 
login() method is not properly synchronized. Lines 128-130 check if 
checkTgtExecutor is null and then create it, but this check-then-act pattern is 
not atomic even with volatile. In a multi-threaded environment, multiple 
threads could pass the null check simultaneously and create multiple executors. 
This should be protected with proper synchronization or use a thread-safe 
initialization pattern.



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.kerberos;
+
+import static 
org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_TOKEN_SIGNATURE;
+import static org.apache.gravitino.hive.kerberos.KerberosConfig.PRINCIPAL_KEY;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.hive.client.HiveClient;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KerberosClient implements java.io.Closeable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(KerberosClient.class);
+
+  private volatile ScheduledThreadPoolExecutor checkTgtExecutor;
+  private final Properties conf;
+  private final Configuration hadoopConf;
+  private final boolean refreshCredentials;
+  private volatile UserGroupInformation realLoginUgi;
+  private final String keytabFilePath;
+  private volatile HiveClient hiveClient = null;
+
+  public KerberosClient(
+      Properties properties,
+      Configuration hadoopConf,
+      boolean refreshCredentials,
+      String keytabFilePath)
+      throws IOException {
+    this.conf = properties;
+    this.hadoopConf = hadoopConf;
+    this.refreshCredentials = refreshCredentials;
+    File keyTabFile = saveKeyTabFileFromUri(keytabFilePath);
+    this.keytabFilePath = keyTabFile.getAbsolutePath();
+  }
+
+  public UserGroupInformation loginProxyUser(String currentUser) {
+    try {
+      if (currentUser.equals(realLoginUgi.getUserName()) || hiveClient == 
null) {
+        return realLoginUgi;
+      }
+
+      String tokenSignature = conf.getProperty(HIVE_METASTORE_TOKEN_SIGNATURE, 
"");
+      String principal = conf.getProperty(PRINCIPAL_KEY, "");
+      @SuppressWarnings("null")
+      List<String> principalComponents = 
Splitter.on('@').splitToList(principal);
+      Preconditions.checkArgument(
+          principalComponents.size() == 2, "The principal has the wrong 
format");
+      String kerberosRealm = principalComponents.get(1);
+
+      UserGroupInformation proxyUser;
+      final String finalPrincipalName;
+      if (!currentUser.contains("@")) {
+        finalPrincipalName = String.format("%s@%s", currentUser, 
kerberosRealm);
+      } else {
+        finalPrincipalName = currentUser;
+      }
+
+      proxyUser = UserGroupInformation.createProxyUser(finalPrincipalName, 
realLoginUgi);
+
+      // Acquire HMS delegation token for the proxy user and attach it to UGI
+      Preconditions.checkArgument(hiveClient != null, "HiveClient is not set 
in KerberosClient");

Review Comment:
   The check on line 94 verifies that hiveClient is not null, but this check is 
redundant given the earlier check on line 71. If hiveClient is null at line 71, 
the method returns realLoginUgi immediately, so we can never reach line 94 with 
a null hiveClient. This Preconditions check should be removed or the logic 
should be refactored to avoid redundancy.
   ```suggestion
   
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/FetchFileUtils.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hive.kerberos;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+

Review Comment:
   The FetchFileUtils class is missing class-level Javadoc documentation. As a 
utility class handling sensitive file operations (keytab files), it should have 
documentation explaining its purpose, security considerations, and supported 
URI schemes.
   ```suggestion
   
   /**
    * Utility class for fetching files from various URI schemes to a local 
destination.
    * <p>
    * This class is primarily used to handle sensitive file operations, such as 
fetching
    * Kerberos keytab files, from different sources including local 
filesystems, HTTP(S),
    * FTP, and HDFS. It supports the following URI schemes:
    * <ul>
    *   <li><b>file</b>: Creates a symbolic link to the local file.</li>
    *   <li><b>http</b>, <b>https</b>, <b>ftp</b>: Downloads the file using the 
specified protocol.</li>
    *   <li><b>hdfs</b>: Copies the file from Hadoop Distributed File 
System.</li>
    * </ul>
    * <p>
    * <b>Security considerations:</b>
    * <ul>
    *   <li>Fetching sensitive files (e.g., keytabs) from remote or untrusted 
sources may expose credentials.</li>
    *   <li>Symbolic link creation (for <b>file</b> scheme) may have security 
implications if the destination is not trusted.</li>
    *   <li>Ensure proper permissions and validation of URIs to avoid path 
traversal or SSRF vulnerabilities.</li>
    * </ul>
    */
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/FetchFileUtils.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hive.kerberos;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class FetchFileUtils {
+
+  private FetchFileUtils() {}
+
+  public static void fetchFileFromUri(
+      String fileUri, File destFile, int timeout, Configuration conf) throws 
java.io.IOException {
+    try {

Review Comment:
   The fetchFileFromUri method lacks Javadoc documentation. Given that this 
method handles security-sensitive file operations and supports multiple URI 
schemes, it should have comprehensive documentation including parameter 
descriptions, return value, exceptions thrown, and security considerations for 
each URI scheme.
   ```suggestion
     /**
      * Fetches a file from the specified URI and stores it at the given 
destination file.
      * <p>
      * Supported URI schemes:
      * <ul>
      *   <li><b>http, https, ftp</b>: Downloads the file from the remote URL 
to {@code destFile} using the specified timeout.</li>
      *   <li><b>file</b>: Creates a symbolic link at {@code destFile} pointing 
to the local file specified by {@code fileUri}.</li>
      *   <li><b>hdfs</b>: Copies the file from HDFS to {@code destFile} using 
the provided Hadoop {@code Configuration}.</li>
      * </ul>
      * <p>
      * <b>Security considerations:</b>
      * <ul>
      *   <li>When using <b>http, https, or ftp</b>, ensure the source is 
trusted to avoid downloading malicious files.</li>
      *   <li>When using <b>file</b>, creating a symbolic link may introduce 
security risks such as symlink attacks or privilege escalation. Validate the 
source path and ensure proper permissions.</li>
      *   <li>When using <b>hdfs</b>, ensure the Hadoop configuration is secure 
and the source path is trusted.</li>
      * </ul>
      *
      * @param fileUri the URI of the source file. Supported schemes: http, 
https, ftp, file, hdfs.
      * @param destFile the destination {@link File} where the file will be 
stored or the symlink will be created.
      * @param timeout the timeout in seconds for network operations (applies 
to http, https, ftp).
      * @param conf the Hadoop {@link Configuration} used for HDFS operations; 
may be ignored for other schemes.
      * @throws java.io.IOException if an I/O error occurs during file 
operations.
      * @throws IllegalArgumentException if the URI scheme is unsupported or 
the URI is malformed.
      */
     public static void fetchFileFromUri(
         String fileUri, File destFile, int timeout, Configuration conf) throws 
java.io.IOException {
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/FetchFileUtils.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hive.kerberos;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class FetchFileUtils {
+
+  private FetchFileUtils() {}
+
+  public static void fetchFileFromUri(
+      String fileUri, File destFile, int timeout, Configuration conf) throws 
java.io.IOException {
+    try {
+      URI uri = new URI(fileUri);
+      String scheme = 
java.util.Optional.ofNullable(uri.getScheme()).orElse("file");
+
+      switch (scheme) {
+        case "http":
+        case "https":
+        case "ftp":
+          FileUtils.copyURLToFile(uri.toURL(), destFile, timeout * 1000, 
timeout * 1000);
+          break;
+
+        case "file":
+          Files.createSymbolicLink(destFile.toPath(), new 
File(uri.getPath()).toPath());
+          break;
+
+        case "hdfs":
+          FileSystem.get(conf).copyToLocalFile(new Path(uri), new 
Path(destFile.toURI()));
+          break;
+

Review Comment:
   This code contradicts the validation at line 154 in 
KerberosClient.saveKeyTabFileFromUri which explicitly prevents HDFS URIs with 
the message "Keytab uri doesn't support to use HDFS". Either the validation 
should be removed or this HDFS case should be removed from the switch statement 
to maintain consistency.
   ```suggestion
   
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.kerberos;
+
+import static 
org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_TOKEN_SIGNATURE;
+import static org.apache.gravitino.hive.kerberos.KerberosConfig.PRINCIPAL_KEY;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.hive.client.HiveClient;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KerberosClient implements java.io.Closeable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(KerberosClient.class);
+
+  private volatile ScheduledThreadPoolExecutor checkTgtExecutor;
+  private final Properties conf;
+  private final Configuration hadoopConf;
+  private final boolean refreshCredentials;
+  private volatile UserGroupInformation realLoginUgi;
+  private final String keytabFilePath;
+  private volatile HiveClient hiveClient = null;
+
+  public KerberosClient(
+      Properties properties,
+      Configuration hadoopConf,
+      boolean refreshCredentials,
+      String keytabFilePath)
+      throws IOException {
+    this.conf = properties;
+    this.hadoopConf = hadoopConf;
+    this.refreshCredentials = refreshCredentials;
+    File keyTabFile = saveKeyTabFileFromUri(keytabFilePath);
+    this.keytabFilePath = keyTabFile.getAbsolutePath();
+  }
+
+  public UserGroupInformation loginProxyUser(String currentUser) {
+    try {
+      if (currentUser.equals(realLoginUgi.getUserName()) || hiveClient == 
null) {
+        return realLoginUgi;
+      }
+
+      String tokenSignature = conf.getProperty(HIVE_METASTORE_TOKEN_SIGNATURE, 
"");
+      String principal = conf.getProperty(PRINCIPAL_KEY, "");
+      @SuppressWarnings("null")
+      List<String> principalComponents = 
Splitter.on('@').splitToList(principal);
+      Preconditions.checkArgument(
+          principalComponents.size() == 2, "The principal has the wrong 
format");
+      String kerberosRealm = principalComponents.get(1);
+
+      UserGroupInformation proxyUser;
+      final String finalPrincipalName;
+      if (!currentUser.contains("@")) {
+        finalPrincipalName = String.format("%s@%s", currentUser, 
kerberosRealm);
+      } else {
+        finalPrincipalName = currentUser;
+      }
+
+      proxyUser = UserGroupInformation.createProxyUser(finalPrincipalName, 
realLoginUgi);
+
+      // Acquire HMS delegation token for the proxy user and attach it to UGI
+      Preconditions.checkArgument(hiveClient != null, "HiveClient is not set 
in KerberosClient");
+      String tokenStr =
+          hiveClient.getDelegationToken(finalPrincipalName, 
realLoginUgi.getUserName());
+
+      Token<DelegationTokenIdentifier> delegationToken = new Token<>();
+      delegationToken.decodeFromUrlString(tokenStr);
+      delegationToken.setService(new Text(tokenSignature));
+      proxyUser.addToken(delegationToken);
+
+      return proxyUser;
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create proxy user for Kerberos 
Hive client", e);
+    }
+  }
+
+  public UserGroupInformation login() throws Exception {
+    KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf);
+
+    // Check the principal and keytab file
+    String catalogPrincipal = kerberosConfig.getPrincipalName();
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(catalogPrincipal), "The principal can't be 
blank");
+    List<String> principalComponents = 
Splitter.on('@').splitToList(catalogPrincipal);
+    Preconditions.checkArgument(
+        principalComponents.size() == 2, "The principal has the wrong format");
+
+    // Login
+    UserGroupInformation.setConfiguration(hadoopConf);
+    UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath);
+    UserGroupInformation loginUgi = UserGroupInformation.getLoginUser();
+    realLoginUgi = loginUgi;
+
+    // Refresh the cache if it's out of date.
+    if (refreshCredentials) {
+      if (checkTgtExecutor == null) {
+        checkTgtExecutor = new ScheduledThreadPoolExecutor(1, 
getThreadFactory("check-tgt"));
+      }
+      int checkInterval = kerberosConfig.getCheckIntervalSec();
+      checkTgtExecutor.scheduleAtFixedRate(
+          () -> {
+            try {
+              loginUgi.checkTGTAndReloginFromKeytab();
+            } catch (Exception e) {
+              LOG.error("Fail to refresh ugi token: ", e);
+            }
+          },
+          checkInterval,
+          checkInterval,
+          TimeUnit.SECONDS);
+    }
+
+    return loginUgi;
+  }
+
+  public File saveKeyTabFileFromUri(String path) throws IOException {
+    KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf);
+
+    String keyTabUri = kerberosConfig.getKeytab();
+    Preconditions.checkArgument(StringUtils.isNotBlank(keyTabUri), "Keytab uri 
can't be blank");
+    Preconditions.checkArgument(
+        !keyTabUri.trim().startsWith("hdfs"), "Keytab uri doesn't support to 
use HDFS");

Review Comment:
   The error message uses a double negative which makes it unclear. Instead of 
"doesn't support to use HDFS", use "HDFS URIs are not supported for keytab 
files" or "does not support HDFS URIs".
   ```suggestion
           !keyTabUri.trim().startsWith("hdfs"), "HDFS URIs are not supported 
for keytab files");
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientFactory.java:
##########
@@ -169,7 +185,22 @@ private HiveClient 
createHiveClientInternal(HiveClientClassLoader classloader) {
     ClassLoader origLoader = Thread.currentThread().getContextClassLoader();
     Thread.currentThread().setContextClassLoader(classloader);
     try {
-      return createHiveClientImpl(classloader.getHiveVersion(), properties, 
classloader);
+      if (!enableImpersonation) {
+        return createHiveClientImpl(classloader.getHiveVersion(), properties, 
classloader);
+      } else {
+        UserGroupInformation ugi;
+        if (!enableKerberos) {
+          ugi = UserGroupInformation.getCurrentUser();
+          if (!ugi.getUserName().equals(PrincipalUtils.getCurrentUserName())) {
+            ugi = 
UserGroupInformation.createProxyUser(PrincipalUtils.getCurrentUserName(), ugi);
+          }

Review Comment:
   The logic for handling impersonation when Kerberos is disabled lacks proper 
validation. If a user enables impersonation without Kerberos, the code creates 
a proxy user based on PrincipalUtils.getCurrentUserName(), but there's no 
validation that this is a legitimate use case. This could potentially be 
exploited for privilege escalation if impersonation is enabled in a 
non-Kerberos environment without proper authentication controls elsewhere.



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.kerberos;
+
+import static 
org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_TOKEN_SIGNATURE;
+import static org.apache.gravitino.hive.kerberos.KerberosConfig.PRINCIPAL_KEY;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.hive.client.HiveClient;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KerberosClient implements java.io.Closeable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(KerberosClient.class);
+
+  private volatile ScheduledThreadPoolExecutor checkTgtExecutor;
+  private final Properties conf;
+  private final Configuration hadoopConf;
+  private final boolean refreshCredentials;
+  private volatile UserGroupInformation realLoginUgi;
+  private final String keytabFilePath;
+  private volatile HiveClient hiveClient = null;
+
+  public KerberosClient(
+      Properties properties,
+      Configuration hadoopConf,
+      boolean refreshCredentials,
+      String keytabFilePath)
+      throws IOException {
+    this.conf = properties;
+    this.hadoopConf = hadoopConf;
+    this.refreshCredentials = refreshCredentials;
+    File keyTabFile = saveKeyTabFileFromUri(keytabFilePath);
+    this.keytabFilePath = keyTabFile.getAbsolutePath();
+  }
+
+  public UserGroupInformation loginProxyUser(String currentUser) {
+    try {
+      if (currentUser.equals(realLoginUgi.getUserName()) || hiveClient == 
null) {
+        return realLoginUgi;
+      }
+
+      String tokenSignature = conf.getProperty(HIVE_METASTORE_TOKEN_SIGNATURE, 
"");
+      String principal = conf.getProperty(PRINCIPAL_KEY, "");
+      @SuppressWarnings("null")
+      List<String> principalComponents = 
Splitter.on('@').splitToList(principal);
+      Preconditions.checkArgument(
+          principalComponents.size() == 2, "The principal has the wrong 
format");
+      String kerberosRealm = principalComponents.get(1);
+
+      UserGroupInformation proxyUser;
+      final String finalPrincipalName;
+      if (!currentUser.contains("@")) {
+        finalPrincipalName = String.format("%s@%s", currentUser, 
kerberosRealm);
+      } else {
+        finalPrincipalName = currentUser;
+      }
+
+      proxyUser = UserGroupInformation.createProxyUser(finalPrincipalName, 
realLoginUgi);
+
+      // Acquire HMS delegation token for the proxy user and attach it to UGI
+      Preconditions.checkArgument(hiveClient != null, "HiveClient is not set 
in KerberosClient");
+      String tokenStr =
+          hiveClient.getDelegationToken(finalPrincipalName, 
realLoginUgi.getUserName());
+
+      Token<DelegationTokenIdentifier> delegationToken = new Token<>();
+      delegationToken.decodeFromUrlString(tokenStr);
+      delegationToken.setService(new Text(tokenSignature));
+      proxyUser.addToken(delegationToken);
+
+      return proxyUser;
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create proxy user for Kerberos 
Hive client", e);
+    }
+  }
+
+  public UserGroupInformation login() throws Exception {
+    KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf);
+
+    // Check the principal and keytab file
+    String catalogPrincipal = kerberosConfig.getPrincipalName();
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(catalogPrincipal), "The principal can't be 
blank");
+    List<String> principalComponents = 
Splitter.on('@').splitToList(catalogPrincipal);
+    Preconditions.checkArgument(
+        principalComponents.size() == 2, "The principal has the wrong format");
+
+    // Login
+    UserGroupInformation.setConfiguration(hadoopConf);
+    UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath);
+    UserGroupInformation loginUgi = UserGroupInformation.getLoginUser();
+    realLoginUgi = loginUgi;
+
+    // Refresh the cache if it's out of date.
+    if (refreshCredentials) {
+      if (checkTgtExecutor == null) {
+        checkTgtExecutor = new ScheduledThreadPoolExecutor(1, 
getThreadFactory("check-tgt"));
+      }
+      int checkInterval = kerberosConfig.getCheckIntervalSec();
+      checkTgtExecutor.scheduleAtFixedRate(
+          () -> {
+            try {
+              loginUgi.checkTGTAndReloginFromKeytab();
+            } catch (Exception e) {
+              LOG.error("Fail to refresh ugi token: ", e);
+            }
+          },
+          checkInterval,
+          checkInterval,
+          TimeUnit.SECONDS);
+    }
+
+    return loginUgi;
+  }
+
+  public File saveKeyTabFileFromUri(String path) throws IOException {
+    KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf);
+
+    String keyTabUri = kerberosConfig.getKeytab();
+    Preconditions.checkArgument(StringUtils.isNotBlank(keyTabUri), "Keytab uri 
can't be blank");
+    Preconditions.checkArgument(
+        !keyTabUri.trim().startsWith("hdfs"), "Keytab uri doesn't support to 
use HDFS");
+
+    File keytabsDir = new File("keytabs");
+    if (!keytabsDir.exists()) {
+      keytabsDir.mkdir();
+    }
+    File keytabFile = new File(path);
+    keytabFile.deleteOnExit();
+    if (keytabFile.exists() && !keytabFile.delete()) {
+      throw new IllegalStateException(
+          String.format("Fail to delete keytab file %s", 
keytabFile.getAbsolutePath()));
+    }
+    int fetchKeytabFileTimeout = kerberosConfig.getFetchTimeoutSec();
+    FetchFileUtils.fetchFileFromUri(keyTabUri, keytabFile, 
fetchKeytabFileTimeout, hadoopConf);
+    return keytabFile;
+  }
+
+  private static ThreadFactory getThreadFactory(String factoryName) {
+    return new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat(factoryName + 
"-%d").build();
+  }
+
+  @Override
+  public void close() {
+    if (checkTgtExecutor != null) {
+      checkTgtExecutor.shutdown();
+    }
+  }
+
+  public void setHiveClient(HiveClient client) {
+    this.hiveClient = client;

Review Comment:
   The hiveClient field is declared as volatile and accessed on line 71 without 
synchronization, but is set via setHiveClient() on line 183. This creates a 
race condition where loginProxyUser() might see a null hiveClient even after it 
was set, or see a partially constructed object. The volatile keyword ensures 
visibility but the null check followed by usage is not atomic. Consider proper 
synchronization or document that setHiveClient must be called before any calls 
to loginProxyUser.



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/AuthenticationConfig.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.kerberos;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+
+import java.util.Map;
+import java.util.Properties;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.config.ConfigBuilder;
+import org.apache.gravitino.config.ConfigConstants;
+import org.apache.gravitino.config.ConfigEntry;
+import org.apache.hadoop.conf.Configuration;
+
+public class AuthenticationConfig extends Config {
+
+  // The key for the authentication type, currently we support Kerberos and 
simple

Review Comment:
   The AuthenticationConfig class is missing class-level Javadoc documentation. 
As a public configuration class that defines authentication settings and 
controls Kerberos vs simple authentication, it should have documentation 
explaining its purpose, the authentication types supported, and how it 
integrates with both catalog properties and Hadoop configuration.
   ```suggestion
   /**
    * Configuration class for authentication settings in the Hive Metastore 
catalog.
    * <p>
    * This class manages authentication configuration, supporting both Kerberos 
and simple authentication types.
    * It loads settings from both catalog properties and Hadoop configuration, 
allowing flexible integration.
    * <ul>
    *   <li>Supported authentication types: {@code simple}, {@code 
kerberos}</li>
    *   <li>Configuration sources: catalog properties and Hadoop {@code 
Configuration}</li>
    *   <li>Controls impersonation and authentication type for secure 
access</li>
    * </ul>
    * <p>
    * The authentication type is determined by the {@code authentication.type} 
property, which defaults to {@code simple}
    * if not specified. When Kerberos is enabled, additional settings such as 
impersonation can be configured.
    */
   public class AuthenticationConfig extends Config {
   ```



##########
catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/FetchFileUtils.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hive.kerberos;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class FetchFileUtils {
+
+  private FetchFileUtils() {}
+
+  public static void fetchFileFromUri(
+      String fileUri, File destFile, int timeout, Configuration conf) throws 
java.io.IOException {
+    try {
+      URI uri = new URI(fileUri);
+      String scheme = 
java.util.Optional.ofNullable(uri.getScheme()).orElse("file");
+
+      switch (scheme) {
+        case "http":
+        case "https":
+        case "ftp":
+          FileUtils.copyURLToFile(uri.toURL(), destFile, timeout * 1000, 
timeout * 1000);
+          break;
+
+        case "file":
+          Files.createSymbolicLink(destFile.toPath(), new 
File(uri.getPath()).toPath());

Review Comment:
   Creating a symbolic link for local file URIs could cause issues. If the 
source file is deleted or modified, the symlink will point to invalid or 
changed content. For security and reliability, it would be safer to copy the 
file content rather than creating a symbolic link. Additionally, symbolic link 
creation may fail due to filesystem permissions or lack of support on certain 
platforms (like Windows without administrator privileges).
   ```suggestion
             FileUtils.copyFile(new File(uri.getPath()), destFile);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to