This is an automated email from the ASF dual-hosted git repository.

yuqi4733 pushed a commit to branch internal-main
in repository https://gitbox.apache.org/repos/asf/gravitino.git

commit 9b2a26be5f0d1da07fa16409bf670bd29d73a7c9
Author: Yuhui <[email protected]>
AuthorDate: Thu Dec 11 16:04:16 2025 +0800

    [#9456] feat(hive-catalog): Support Kerberos authentication and user 
impersonation in the HiveClient (#9458)
    
    ### What changes were proposed in this pull request?
    
    Support Kerberos authentication and user impersonation in the HiveClient
    
    ### Why are the changes needed?
    
    Fix: #9456
    
    ### Does this PR introduce _any_ user-facing change?
    
    NO
    
    ### How was this patch tested?
    
    UT and IT
---
 .../gravitino/hive/client/HiveClientFactory.java   |  72 +++++++-
 .../gravitino/hive/client/ProxyHiveClientImpl.java |  63 ++++++-
 .../hive/kerberos/AuthenticationConfig.java        |  83 +++++++++
 .../gravitino/hive/kerberos/FetchFileUtils.java    |  63 +++++++
 .../gravitino/hive/kerberos/KerberosClient.java    | 197 +++++++++++++++++++++
 .../gravitino/hive/kerberos/KerberosConfig.java    |  93 ++++++++++
 6 files changed, 560 insertions(+), 11 deletions(-)

diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientFactory.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientFactory.java
index 4442125d9e..39ceefd240 100644
--- 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientFactory.java
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientFactory.java
@@ -30,6 +30,9 @@ import java.lang.reflect.Method;
 import java.util.Properties;
 import org.apache.commons.lang3.reflect.MethodUtils;
 import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.apache.gravitino.hive.kerberos.AuthenticationConfig;
+import org.apache.gravitino.hive.kerberos.KerberosClient;
+import org.apache.gravitino.utils.PrincipalUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
@@ -38,17 +41,23 @@ import org.slf4j.LoggerFactory;
 public final class HiveClientFactory {
   private static final Logger LOG = 
LoggerFactory.getLogger(HiveClientFactory.class);
 
+  public static final String GRAVITINO_KEYTAB_FORMAT = 
"keytabs/gravitino-%s-keytab";
+
   // Remember which Hive backend classloader worked successfully for this 
factory.
   private volatile HiveClientClassLoader backendClassLoader;
   private final Object classLoaderLock = new Object();
 
-  @SuppressWarnings("UnusedVariable")
-  private final Configuration hadoopConf;
+  private boolean enableKerberos;
+  private boolean enableImpersonation = false;
+  private KerberosClient kerberosClient;
 
+  private final Configuration hadoopConf;
   private final Properties properties;
+  private final String keytabPath;
 
   /**
-   * Creates a {@link HiveClientFactory} bound to the given configuration 
properties.
+   * Creates a {@link HiveClientFactory} boundGRAVITINO_KEYTAB_FORMAT to the 
given configuration
+   * properties.
    *
    * @param properties Hive client configuration, must not be null.
    * @param id An identifier for this factory instance.
@@ -56,10 +65,18 @@ public final class HiveClientFactory {
   public HiveClientFactory(Properties properties, String id) {
     Preconditions.checkArgument(properties != null, "Properties cannot be 
null");
     this.properties = properties;
+    this.keytabPath = String.format(GRAVITINO_KEYTAB_FORMAT, id);
 
     try {
       this.hadoopConf = new Configuration();
       updateConfigurationFromProperties(properties, hadoopConf);
+
+      initKerberosIfNecessary();
+      if (enableKerberos) {
+        // set hive client to kerberos client for retrieving delegation token
+        HiveClient client = createHiveClient();
+        kerberosClient.setHiveClient(client);
+      }
     } catch (Exception e) {
       throw new RuntimeException("Failed to initialize HiveClientFactory", e);
     }
@@ -169,7 +186,23 @@ public final class HiveClientFactory {
     ClassLoader origLoader = Thread.currentThread().getContextClassLoader();
     Thread.currentThread().setContextClassLoader(classloader);
     try {
-      return createHiveClientImpl(classloader.getHiveVersion(), properties, 
classloader);
+      if (enableImpersonation) {
+        UserGroupInformation ugi;
+        if (enableKerberos) {
+          ugi = 
kerberosClient.loginProxyUser(PrincipalUtils.getCurrentUserName());
+        } else {
+          ugi = UserGroupInformation.getCurrentUser();
+          if (!ugi.getUserName().equals(PrincipalUtils.getCurrentUserName())) {
+            ugi = 
UserGroupInformation.createProxyUser(PrincipalUtils.getCurrentUserName(), ugi);
+          }
+        }
+        return createProxyHiveClientImpl(
+            classloader.getHiveVersion(), properties, ugi, classloader);
+
+      } else {
+        return createHiveClientImpl(classloader.getHiveVersion(), properties, 
classloader);
+      }
+
     } catch (Exception e) {
       throw HiveExceptionConverter.toGravitinoException(
           e,
@@ -180,18 +213,39 @@ public final class HiveClientFactory {
     }
   }
 
+  private void initKerberosIfNecessary() {
+    try {
+      AuthenticationConfig authenticationConfig = new 
AuthenticationConfig(properties, hadoopConf);
+      enableKerberos = authenticationConfig.isKerberosAuth();
+      enableImpersonation = authenticationConfig.isImpersonationEnable();
+      if (!enableKerberos) {
+        return;
+      }
+
+      kerberosClient = new KerberosClient(properties, hadoopConf, true, 
keytabPath);
+      kerberosClient.login();
+
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to initialize kerberos client", e);
+    }
+  }
+
   /** Release resources held by this factory. */
   public void close() {
-    synchronized (classLoaderLock) {
-      try {
+    try {
+      if (kerberosClient != null) {
+        kerberosClient.close();
+        kerberosClient = null;
+      }
+
+      synchronized (classLoaderLock) {
         if (backendClassLoader != null) {
           backendClassLoader.close();
           backendClassLoader = null;
         }
-
-      } catch (Exception e) {
-        LOG.warn("Failed to close HiveClientFactory", e);
       }
+    } catch (Exception e) {
+      LOG.warn("Failed to close HiveClientFactory", e);
     }
   }
 }
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/ProxyHiveClientImpl.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/ProxyHiveClientImpl.java
index c62c394a04..6363148cdf 100644
--- 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/ProxyHiveClientImpl.java
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/ProxyHiveClientImpl.java
@@ -18,12 +18,71 @@
  */
 package org.apache.gravitino.hive.client;
 
+import java.io.IOException;
 import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Properties;
+import org.apache.hadoop.security.UserGroupInformation;
 
+/**
+ * A {@link HiveClient} proxy that executes all methods as a given user via 
{@link
+ * UserGroupInformation#doAs(PrivilegedExceptionAction)}.
+ */
 public class ProxyHiveClientImpl implements InvocationHandler {
+
+  private final HiveClient delegate;
+  private final UserGroupInformation ugi;
+
+  private ProxyHiveClientImpl(HiveClient delegate, UserGroupInformation ugi) {
+    this.delegate = delegate;
+    this.ugi = ugi;
+  }
+
+  /**
+   * Wraps a {@link HiveClient} so that all its methods are executed via {@link
+   * UserGroupInformation#doAs(PrivilegedExceptionAction)} of the current user.
+   *
+   * <p>Callers should ensure Kerberos has been configured and the login user 
is set appropriately
+   * (for example via keytab) before calling this method.
+   */
+  public static HiveClient createClient(
+      HiveClientClassLoader.HiveVersion version, UserGroupInformation ugi, 
Properties properties) {
+    try {
+      HiveClient client =
+          ugi.doAs(
+              (PrivilegedExceptionAction<HiveClient>)
+                  () ->
+                      HiveClientFactory.createHiveClientImpl(
+                          version, properties, 
Thread.currentThread().getContextClassLoader()));
+      return (HiveClient)
+          Proxy.newProxyInstance(
+              HiveClient.class.getClassLoader(),
+              new Class<?>[] {HiveClient.class},
+              new ProxyHiveClientImpl(client, ugi));
+
+    } catch (IOException | InterruptedException ex) {
+      throw new RuntimeException("Failed to create Kerberos Hive client", ex);
+    }
+  }
+
   @Override
-  public Object invoke(Object o, Method method, Object[] objects) throws 
Throwable {
-    return null;
+  public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
+    try {
+      return ugi.doAs((PrivilegedExceptionAction<Object>) () -> 
method.invoke(delegate, args));
+    } catch (UndeclaredThrowableException e) {
+      Throwable innerException = e.getCause();
+      if (innerException instanceof PrivilegedActionException) {
+        throw innerException.getCause();
+      } else if (innerException instanceof InvocationTargetException) {
+        throw innerException.getCause();
+      } else {
+        throw innerException;
+      }
+    }
   }
 }
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/AuthenticationConfig.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/AuthenticationConfig.java
new file mode 100644
index 0000000000..197bcc2dc3
--- /dev/null
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/AuthenticationConfig.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.kerberos;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+
+import java.util.Map;
+import java.util.Properties;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.config.ConfigBuilder;
+import org.apache.gravitino.config.ConfigConstants;
+import org.apache.gravitino.config.ConfigEntry;
+import org.apache.hadoop.conf.Configuration;
+
+/** Configuration for authentication */
+public class AuthenticationConfig extends Config {
+
+  // The key for the authentication type, currently we support Kerberos and 
simple
+  public static final String AUTH_TYPE_KEY = "authentication.type";
+
+  public static final String IMPERSONATION_ENABLE_KEY = 
"authentication.impersonation-enable";
+
+  enum AuthenticationType {
+    SIMPLE,
+    KERBEROS;
+  }
+
+  public static final boolean KERBEROS_DEFAULT_IMPERSONATION_ENABLE = false;
+
+  public AuthenticationConfig(Properties properties, Configuration 
configuration) {
+    super(false);
+    loadFromHdfsConfiguration(configuration);
+    loadFromMap((Map) properties, k -> true);
+  }
+
+  private void loadFromHdfsConfiguration(Configuration configuration) {
+    String authType = configuration.get(HADOOP_SECURITY_AUTHENTICATION, 
"simple");
+    configMap.put(AUTH_TYPE_KEY, authType);
+  }
+
+  public static final ConfigEntry<String> AUTH_TYPE_ENTRY =
+      new ConfigBuilder(AUTH_TYPE_KEY)
+          .doc("The type of authentication, currently we only support simple 
and Kerberos")
+          .version(ConfigConstants.VERSION_1_0_0)
+          .stringConf()
+          .createWithDefault("simple");
+
+  public static final ConfigEntry<Boolean> ENABLE_IMPERSONATION_ENTRY =
+      new ConfigBuilder(IMPERSONATION_ENABLE_KEY)
+          .doc("Whether to enable impersonation")
+          .version(ConfigConstants.VERSION_1_0_0)
+          .booleanConf()
+          .createWithDefault(KERBEROS_DEFAULT_IMPERSONATION_ENABLE);
+
+  public String getAuthType() {
+    return get(AUTH_TYPE_ENTRY);
+  }
+
+  public boolean isKerberosAuth() {
+    return 
AuthenticationConfig.AuthenticationType.KERBEROS.name().equalsIgnoreCase(getAuthType());
+  }
+
+  public boolean isImpersonationEnable() {
+    return get(ENABLE_IMPERSONATION_ENTRY);
+  }
+}
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/FetchFileUtils.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/FetchFileUtils.java
new file mode 100644
index 0000000000..743b1d562c
--- /dev/null
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/FetchFileUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hive.kerberos;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class FetchFileUtils {
+
+  private FetchFileUtils() {}
+
+  public static void fetchFileFromUri(
+      String fileUri, File destFile, int timeout, Configuration conf) throws 
java.io.IOException {
+    try {
+      URI uri = new URI(fileUri);
+      String scheme = 
java.util.Optional.ofNullable(uri.getScheme()).orElse("file");
+
+      switch (scheme) {
+        case "http":
+        case "https":
+        case "ftp":
+          FileUtils.copyURLToFile(uri.toURL(), destFile, timeout * 1000, 
timeout * 1000);
+          break;
+
+        case "file":
+          Files.createSymbolicLink(destFile.toPath(), new 
File(uri.getPath()).toPath());
+          break;
+
+        case "hdfs":
+          FileSystem.get(conf).copyToLocalFile(new Path(uri), new 
Path(destFile.toURI()));
+          break;
+
+        default:
+          throw new IllegalArgumentException(
+              String.format("The scheme '%s' is not supported", scheme));
+      }
+    } catch (URISyntaxException ue) {
+      throw new IllegalArgumentException("The uri of file has the wrong 
format", ue);
+    }
+  }
+}
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java
new file mode 100644
index 0000000000..a5cfa7eddf
--- /dev/null
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.kerberos;
+
+import static 
org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_TOKEN_SIGNATURE;
+import static org.apache.gravitino.hive.kerberos.KerberosConfig.PRINCIPAL_KEY;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.hive.client.HiveClient;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Kerberos client for Hive Metastore. */
+public class KerberosClient implements java.io.Closeable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(KerberosClient.class);
+
+  private ScheduledThreadPoolExecutor checkTgtExecutor;
+  private final Properties conf;
+  private final Configuration hadoopConf;
+  private final boolean refreshCredentials;
+  private UserGroupInformation realLoginUgi;
+  private final String keytabFilePath;
+  private HiveClient hiveClient = null;
+
+  public KerberosClient(
+      Properties properties,
+      Configuration hadoopConf,
+      boolean refreshCredentials,
+      String keytabFilePath)
+      throws IOException {
+    this.conf = properties;
+    this.hadoopConf = hadoopConf;
+    this.refreshCredentials = refreshCredentials;
+    File keyTabFile = saveKeyTabFileFromUri(keytabFilePath);
+    this.keytabFilePath = keyTabFile.getAbsolutePath();
+  }
+
+  /**
+   * Login proxy user for the given user name.
+   *
+   * @param currentUser The user name to login
+   * @return The UserGroupInformation for the proxy user
+   */
+  public UserGroupInformation loginProxyUser(String currentUser) {
+    try {
+      // hiveClient is null in case the initial the kerbers client with the 
login user
+      if (currentUser.equals(realLoginUgi.getUserName()) || hiveClient == 
null) {
+        return realLoginUgi;
+      }
+
+      String tokenSignature = conf.getProperty(HIVE_METASTORE_TOKEN_SIGNATURE, 
"");
+      String principal = conf.getProperty(PRINCIPAL_KEY, "");
+      List<String> principalComponents = 
Splitter.on('@').splitToList(principal);
+      Preconditions.checkArgument(
+          principalComponents.size() == 2, "The principal has the wrong 
format");
+      String kerberosRealm = principalComponents.get(1);
+
+      UserGroupInformation proxyUser;
+      final String finalPrincipalName;
+      if (!currentUser.contains("@")) {
+        finalPrincipalName = String.format("%s@%s", currentUser, 
kerberosRealm);
+      } else {
+        finalPrincipalName = currentUser;
+      }
+
+      proxyUser = UserGroupInformation.createProxyUser(finalPrincipalName, 
realLoginUgi);
+
+      // Acquire HMS delegation token for the proxy user and attach it to UGI
+      String tokenStr =
+          hiveClient.getDelegationToken(finalPrincipalName, 
realLoginUgi.getUserName());
+
+      Token<DelegationTokenIdentifier> delegationToken = new Token<>();
+      delegationToken.decodeFromUrlString(tokenStr);
+      delegationToken.setService(new Text(tokenSignature));
+      proxyUser.addToken(delegationToken);
+
+      return proxyUser;
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create proxy user for Kerberos 
Hive client", e);
+    }
+  }
+
+  /**
+   * Login the Kerberos user from the principal and keytab file.
+   *
+   * @return
+   * @throws Exception
+   */
+  public UserGroupInformation login() throws Exception {
+    KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf);
+
+    // Check the principal and keytab file
+    String catalogPrincipal = kerberosConfig.getPrincipalName();
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(catalogPrincipal), "The principal can't be 
blank");
+    List<String> principalComponents = 
Splitter.on('@').splitToList(catalogPrincipal);
+    Preconditions.checkArgument(
+        principalComponents.size() == 2, "The principal has the wrong format");
+
+    // Login
+    UserGroupInformation.setConfiguration(hadoopConf);
+    UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath);
+    UserGroupInformation loginUgi = UserGroupInformation.getLoginUser();
+    realLoginUgi = loginUgi;
+
+    // Refresh the cache if it's out of date.
+    if (refreshCredentials) {
+      if (checkTgtExecutor == null) {
+        checkTgtExecutor = new ScheduledThreadPoolExecutor(1, 
getThreadFactory("check-tgt"));
+      }
+      int checkInterval = kerberosConfig.getCheckIntervalSec();
+      checkTgtExecutor.scheduleAtFixedRate(
+          () -> {
+            try {
+              loginUgi.checkTGTAndReloginFromKeytab();
+            } catch (Exception e) {
+              LOG.error("Fail to refresh ugi token: ", e);
+            }
+          },
+          checkInterval,
+          checkInterval,
+          TimeUnit.SECONDS);
+    }
+
+    return loginUgi;
+  }
+
+  public File saveKeyTabFileFromUri(String path) throws IOException {
+    KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf);
+
+    String keyTabUri = kerberosConfig.getKeytab();
+    Preconditions.checkArgument(StringUtils.isNotBlank(keyTabUri), "Keytab uri 
can't be blank");
+    Preconditions.checkArgument(
+        !keyTabUri.trim().startsWith("hdfs"), "HDFS URIs are not supported for 
keytab files");
+
+    File keytabsDir = new File("keytabs");
+    if (!keytabsDir.exists()) {
+      keytabsDir.mkdir();
+    }
+    File keytabFile = new File(path);
+    keytabFile.deleteOnExit();
+    if (keytabFile.exists() && !keytabFile.delete()) {
+      throw new IllegalStateException(
+          String.format("Fail to delete keytab file %s", 
keytabFile.getAbsolutePath()));
+    }
+    int fetchKeytabFileTimeout = kerberosConfig.getFetchTimeoutSec();
+    FetchFileUtils.fetchFileFromUri(keyTabUri, keytabFile, 
fetchKeytabFileTimeout, hadoopConf);
+    return keytabFile;
+  }
+
+  private static ThreadFactory getThreadFactory(String factoryName) {
+    return new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat(factoryName + 
"-%d").build();
+  }
+
+  @Override
+  public void close() {
+    if (checkTgtExecutor != null) {
+      checkTgtExecutor.shutdown();
+    }
+  }
+
+  public void setHiveClient(HiveClient client) {
+    this.hiveClient = client;
+  }
+}
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosConfig.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosConfig.java
new file mode 100644
index 0000000000..4495e0d7fc
--- /dev/null
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosConfig.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hive.kerberos;
+
+import java.util.Map;
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.config.ConfigBuilder;
+import org.apache.gravitino.config.ConfigConstants;
+import org.apache.gravitino.config.ConfigEntry;
+import org.apache.hadoop.conf.Configuration;
+
+/** Kerberos authentication configuration */
+public class KerberosConfig extends AuthenticationConfig {
+  public static final String KEY_TAB_URI_KEY = 
"authentication.kerberos.keytab-uri";
+
+  public static final String PRINCIPAL_KEY = 
"authentication.kerberos.principal";
+
+  public static final String CHECK_INTERVAL_SEC_KEY = 
"authentication.kerberos.check-interval-sec";
+
+  public static final String FETCH_TIMEOUT_SEC_KEY =
+      "authentication.kerberos.keytab-fetch-timeout-sec";
+
+  public static final ConfigEntry<String> PRINCIPAL_ENTRY =
+      new ConfigBuilder(PRINCIPAL_KEY)
+          .doc("The principal of the Kerberos connection")
+          .version(ConfigConstants.VERSION_1_0_0)
+          .stringConf()
+          .checkValue(StringUtils::isNotBlank, 
ConfigConstants.NOT_BLANK_ERROR_MSG)
+          .create();
+
+  public static final ConfigEntry<String> KEYTAB_ENTRY =
+      new ConfigBuilder(KEY_TAB_URI_KEY)
+          .doc("The keytab of the Kerberos connection")
+          .version(ConfigConstants.VERSION_1_0_0)
+          .stringConf()
+          .checkValue(StringUtils::isNotBlank, 
ConfigConstants.NOT_BLANK_ERROR_MSG)
+          .create();
+
+  public static final ConfigEntry<Integer> CHECK_INTERVAL_SEC_ENTRY =
+      new ConfigBuilder(CHECK_INTERVAL_SEC_KEY)
+          .doc("The check interval of the Kerberos connection")
+          .version(ConfigConstants.VERSION_1_0_0)
+          .intConf()
+          .checkValue(value -> value > 0, 
ConfigConstants.POSITIVE_NUMBER_ERROR_MSG)
+          .createWithDefault(60);
+
+  public static final ConfigEntry<Integer> FETCH_TIMEOUT_SEC_ENTRY =
+      new ConfigBuilder(FETCH_TIMEOUT_SEC_KEY)
+          .doc("The fetch timeout of the Kerberos connection")
+          .version(ConfigConstants.VERSION_1_0_0)
+          .intConf()
+          .checkValue(value -> value > 0, 
ConfigConstants.POSITIVE_NUMBER_ERROR_MSG)
+          .createWithDefault(60);
+
+  public KerberosConfig(Properties properties, Configuration configuration) {
+    super(properties, configuration);
+    loadFromMap((Map) properties, k -> true);
+  }
+
+  public String getPrincipalName() {
+    return get(PRINCIPAL_ENTRY);
+  }
+
+  public String getKeytab() {
+    return get(KEYTAB_ENTRY);
+  }
+
+  public int getCheckIntervalSec() {
+    return get(CHECK_INTERVAL_SEC_ENTRY);
+  }
+
+  public int getFetchTimeoutSec() {
+    return get(FETCH_TIMEOUT_SEC_ENTRY);
+  }
+}

Reply via email to