Copilot commented on code in PR #9458: URL: https://github.com/apache/gravitino/pull/9458#discussion_r2609039998
########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/FetchFileUtils.java: ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.hive.kerberos; + +import java.io.File; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public class FetchFileUtils { + + private FetchFileUtils() {} + + public static void fetchFileFromUri( + String fileUri, File destFile, int timeout, Configuration conf) throws java.io.IOException { + try { + URI uri = new URI(fileUri); + String scheme = java.util.Optional.ofNullable(uri.getScheme()).orElse("file"); + + switch (scheme) { + case "http": + case "https": + case "ftp": + FileUtils.copyURLToFile(uri.toURL(), destFile, timeout * 1000, timeout * 1000); + break; + + case "file": + Files.createSymbolicLink(destFile.toPath(), new File(uri.getPath()).toPath()); + break; + + case "hdfs": + FileSystem.get(conf).copyToLocalFile(new Path(uri), new Path(destFile.toURI())); + break; + + default: + throw new IllegalArgumentException( + String.format("Doesn't support the scheme %s", scheme)); Review Comment: The error message uses a double negative which makes it unclear. Instead of "Doesn't support", use "Unsupported" or "The scheme '{scheme}' is not supported". ```suggestion String.format("The scheme '%s' is not supported", scheme)); ``` ########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/ProxyHiveClientImpl.java: ########## @@ -18,12 +18,80 @@ */ package org.apache.gravitino.hive.client; +import java.io.IOException; import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.lang.reflect.UndeclaredThrowableException; +import java.security.PrivilegedExceptionAction; +import java.util.Properties; +import org.apache.hadoop.security.UserGroupInformation; Review Comment: The class ProxyHiveClientImpl is missing public API documentation. Given that this class implements a security-sensitive proxy pattern for Kerberos authentication, it should have comprehensive class-level Javadoc explaining its purpose, thread-safety characteristics, and usage constraints. The documentation should clarify when and how this proxy should be used versus the direct HiveClient implementation. ```suggestion /** * A dynamic proxy {@link InvocationHandler} for {@link HiveClient} that ensures all method calls are executed * under the context of a specific Kerberos principal using {@link UserGroupInformation#doAs(PrivilegedExceptionAction)}. * <p> * <b>Purpose:</b> This class is used to wrap a {@link HiveClient} so that all operations are performed with the * privileges of the specified Kerberos-authenticated user. This is essential in environments where secure * authentication and authorization are required for Hive Metastore access. * <p> * <b>Thread-safety:</b> This class is thread-safe as long as the underlying {@link HiveClient} implementation is * thread-safe and the provided {@link UserGroupInformation} is not mutated after construction. The proxy itself * does not maintain any mutable state after initialization. * <p> * <b>Usage constraints:</b> * <ul> * <li>Instances must be created via the static {@link #createClient} method; direct instantiation is not supported.</li> * <li>Kerberos authentication must be properly configured, and the {@link UserGroupInformation} must represent a * valid, logged-in user (typically via keytab or ticket cache).</li> * <li>This proxy should be used when Hive Metastore operations must be performed as a specific Kerberos principal. * For non-secure or simple authentication scenarios, a direct {@link HiveClient} implementation may be used instead.</li> * </ul> * <p> * <b>Security note:</b> Improper use of this proxy (e.g., with an incorrect or uninitialized {@link UserGroupInformation}) * may result in authentication failures or privilege escalation. Always ensure the Kerberos context is valid before use. */ ``` ########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientFactory.java: ########## @@ -38,14 +41,19 @@ public final class HiveClientFactory { private static final Logger LOG = LoggerFactory.getLogger(HiveClientFactory.class); + public static final String GRAVITINO_KEYTAB_FORMAT = "keytabs/gravitino-hive-%s-keytab"; + Review Comment: The constant GRAVITINO_KEYTAB_FORMAT is defined but never used in this class. It appears the actual keytab path format is constructed differently in the constructor using String.format("keytabs/gravitino-%s-keytab", id). Either this constant should be used or removed to avoid confusion. ```suggestion ``` ########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosConfig.java: ########## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.hive.kerberos; + +import java.util.Map; +import java.util.Properties; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.config.ConfigBuilder; +import org.apache.gravitino.config.ConfigConstants; +import org.apache.gravitino.config.ConfigEntry; +import org.apache.hadoop.conf.Configuration; + +public class KerberosConfig extends AuthenticationConfig { + public static final String KEY_TAB_URI_KEY = "authentication.kerberos.keytab-uri"; + + public static final String PRINCIPAL_KEY = "authentication.kerberos.principal"; + + public static final String CHECK_INTERVAL_SEC_KEY = "authentication.kerberos.check-interval-sec"; + + public static final String FETCH_TIMEOUT_SEC_KEY = + "authentication.kerberos.keytab-fetch-timeout-sec"; + + public static final ConfigEntry<String> PRINCIPAL_ENTRY = + new ConfigBuilder(PRINCIPAL_KEY) + .doc("The principal of the Kerberos connection") + .version(ConfigConstants.VERSION_1_0_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + + public static final ConfigEntry<String> KEYTAB_ENTRY = + new ConfigBuilder(KEY_TAB_URI_KEY) + .doc("The keytab of the Kerberos connection") + .version(ConfigConstants.VERSION_1_0_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + + public static final ConfigEntry<Integer> CHECK_INTERVAL_SEC_ENTRY = + new ConfigBuilder(CHECK_INTERVAL_SEC_KEY) + .doc("The check interval of the Kerberos connection for Hudi catalog") Review Comment: The documentation mentions "Hudi catalog" but this configuration is for the Hive catalog. This appears to be a copy-paste error from another catalog implementation. ```suggestion .doc("The check interval of the Kerberos connection") ``` ########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.hive.kerberos; + +import static org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_TOKEN_SIGNATURE; +import static org.apache.gravitino.hive.kerberos.KerberosConfig.PRINCIPAL_KEY; + +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.hive.client.HiveClient; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + Review Comment: The KerberosClient class is missing class-level Javadoc documentation. Given the complexity and security-sensitive nature of this class (handling Kerberos authentication, delegation tokens, and user impersonation), comprehensive class-level documentation is essential to explain its purpose, lifecycle, thread-safety characteristics, and usage patterns. ```suggestion /** * KerberosClient handles Kerberos authentication, delegation token management, and user impersonation * for secure interactions with Hive Metastore and Hadoop services. * <p> * <b>Purpose:</b> * <ul> * <li>Performs Kerberos login using a principal and keytab file.</li> * <li>Manages delegation tokens for secure communication with Hive Metastore.</li> * <li>Supports user impersonation (proxy users) for multi-user environments.</li> * <li>Optionally refreshes Kerberos credentials periodically to maintain long-lived sessions.</li> * </ul> * <p> * <b>Lifecycle:</b> * <ul> * <li>Instantiate with configuration, Hadoop configuration, refresh flag, and keytab path.</li> * <li>Call {@link #login()} to perform Kerberos authentication and initialize the client.</li> * <li>Use {@link #loginProxyUser(String)} to obtain a proxy user UGI for impersonation.</li> * <li>Call {@link #close()} to release resources and stop background threads.</li> * </ul> * <p> * <b>Thread-safety:</b> * <ul> * <li>Most methods are thread-safe for concurrent use, but the underlying UGI and HiveClient * references are managed with volatile fields for safe publication.</li> * <li>Background credential refresh is managed by a single-threaded executor.</li> * </ul> * <p> * <b>Usage patterns:</b> * <pre> * KerberosClient client = new KerberosClient(props, hadoopConf, true, "/path/to/keytab"); * UserGroupInformation ugi = client.login(); * // Use ugi or create proxy users as needed * client.close(); * </pre> * <p> * <b>Security considerations:</b> * <ul> * <li>Ensure the keytab file is securely stored and access is restricted.</li> * <li>Principal and keytab must match the service identity configured in Hadoop/Hive.</li> * <li>Always call {@link #close()} to prevent credential leaks and stop background threads.</li> * </ul> */ ``` ########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.hive.kerberos; + +import static org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_TOKEN_SIGNATURE; +import static org.apache.gravitino.hive.kerberos.KerberosConfig.PRINCIPAL_KEY; + +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.hive.client.HiveClient; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KerberosClient implements java.io.Closeable { + private static final Logger LOG = LoggerFactory.getLogger(KerberosClient.class); + + private volatile ScheduledThreadPoolExecutor checkTgtExecutor; + private final Properties conf; + private final Configuration hadoopConf; + private final boolean refreshCredentials; + private volatile UserGroupInformation realLoginUgi; + private final String keytabFilePath; + private volatile HiveClient hiveClient = null; + + public KerberosClient( + Properties properties, + Configuration hadoopConf, + boolean refreshCredentials, + String keytabFilePath) + throws IOException { + this.conf = properties; + this.hadoopConf = hadoopConf; + this.refreshCredentials = refreshCredentials; + File keyTabFile = saveKeyTabFileFromUri(keytabFilePath); + this.keytabFilePath = keyTabFile.getAbsolutePath(); + } + + public UserGroupInformation loginProxyUser(String currentUser) { + try { + if (currentUser.equals(realLoginUgi.getUserName()) || hiveClient == null) { + return realLoginUgi; + } + + String tokenSignature = conf.getProperty(HIVE_METASTORE_TOKEN_SIGNATURE, ""); + String principal = conf.getProperty(PRINCIPAL_KEY, ""); + @SuppressWarnings("null") + List<String> principalComponents = Splitter.on('@').splitToList(principal); + Preconditions.checkArgument( + principalComponents.size() == 2, "The principal has the wrong format"); + String kerberosRealm = principalComponents.get(1); + + UserGroupInformation proxyUser; + final String finalPrincipalName; + if (!currentUser.contains("@")) { + finalPrincipalName = String.format("%s@%s", currentUser, kerberosRealm); + } else { + finalPrincipalName = currentUser; + } + + proxyUser = UserGroupInformation.createProxyUser(finalPrincipalName, realLoginUgi); + + // Acquire HMS delegation token for the proxy user and attach it to UGI + Preconditions.checkArgument(hiveClient != null, "HiveClient is not set in KerberosClient"); + String tokenStr = + hiveClient.getDelegationToken(finalPrincipalName, realLoginUgi.getUserName()); + + Token<DelegationTokenIdentifier> delegationToken = new Token<>(); + delegationToken.decodeFromUrlString(tokenStr); + delegationToken.setService(new Text(tokenSignature)); + proxyUser.addToken(delegationToken); + + return proxyUser; + } catch (Exception e) { + throw new RuntimeException("Failed to create proxy user for Kerberos Hive client", e); + } + } + + public UserGroupInformation login() throws Exception { + KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf); + + // Check the principal and keytab file + String catalogPrincipal = kerberosConfig.getPrincipalName(); + Preconditions.checkArgument( + StringUtils.isNotBlank(catalogPrincipal), "The principal can't be blank"); + List<String> principalComponents = Splitter.on('@').splitToList(catalogPrincipal); + Preconditions.checkArgument( + principalComponents.size() == 2, "The principal has the wrong format"); + + // Login + UserGroupInformation.setConfiguration(hadoopConf); + UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath); + UserGroupInformation loginUgi = UserGroupInformation.getLoginUser(); + realLoginUgi = loginUgi; + + // Refresh the cache if it's out of date. + if (refreshCredentials) { + if (checkTgtExecutor == null) { + checkTgtExecutor = new ScheduledThreadPoolExecutor(1, getThreadFactory("check-tgt")); + } + int checkInterval = kerberosConfig.getCheckIntervalSec(); + checkTgtExecutor.scheduleAtFixedRate( + () -> { + try { + loginUgi.checkTGTAndReloginFromKeytab(); + } catch (Exception e) { + LOG.error("Fail to refresh ugi token: ", e); + } + }, + checkInterval, + checkInterval, + TimeUnit.SECONDS); + } + + return loginUgi; + } + + public File saveKeyTabFileFromUri(String path) throws IOException { + KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf); + + String keyTabUri = kerberosConfig.getKeytab(); + Preconditions.checkArgument(StringUtils.isNotBlank(keyTabUri), "Keytab uri can't be blank"); + Preconditions.checkArgument( + !keyTabUri.trim().startsWith("hdfs"), "Keytab uri doesn't support to use HDFS"); + + File keytabsDir = new File("keytabs"); + if (!keytabsDir.exists()) { + keytabsDir.mkdir(); + } + File keytabFile = new File(path); + keytabFile.deleteOnExit(); + if (keytabFile.exists() && !keytabFile.delete()) { + throw new IllegalStateException( + String.format("Fail to delete keytab file %s", keytabFile.getAbsolutePath())); + } + int fetchKeytabFileTimeout = kerberosConfig.getFetchTimeoutSec(); + FetchFileUtils.fetchFileFromUri(keyTabUri, keytabFile, fetchKeytabFileTimeout, hadoopConf); + return keytabFile; + } + + private static ThreadFactory getThreadFactory(String factoryName) { + return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(factoryName + "-%d").build(); + } + + @Override + public void close() { + if (checkTgtExecutor != null) { + checkTgtExecutor.shutdown(); Review Comment: The executor shutdown in close() is not waiting for tasks to complete. This could cause the TGT refresh task to be abruptly terminated mid-execution, potentially leaving the UGI in an inconsistent state. Consider using shutdownNow() to interrupt running tasks or awaitTermination() to wait for graceful completion, and handle any exceptions appropriately. ```suggestion checkTgtExecutor.shutdown(); try { if (!checkTgtExecutor.awaitTermination(30, TimeUnit.SECONDS)) { LOG.warn("TGT refresh executor did not terminate in the allotted time. Forcing shutdown."); checkTgtExecutor.shutdownNow(); } } catch (InterruptedException e) { LOG.warn("Interrupted while waiting for TGT refresh executor to terminate. Forcing shutdown.", e); checkTgtExecutor.shutdownNow(); Thread.currentThread().interrupt(); } ``` ########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/AuthenticationConfig.java: ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.hive.kerberos; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; + +import java.util.Map; +import java.util.Properties; +import org.apache.gravitino.Config; +import org.apache.gravitino.config.ConfigBuilder; +import org.apache.gravitino.config.ConfigConstants; +import org.apache.gravitino.config.ConfigEntry; +import org.apache.hadoop.conf.Configuration; + +public class AuthenticationConfig extends Config { + + // The key for the authentication type, currently we support Kerberos and simple + public static final String AUTH_TYPE_KEY = "authentication.type"; + + public static final String IMPERSONATION_ENABLE_KEY = "authentication.impersonation-enable"; + + enum AuthenticationType { + SIMPLE, + KERBEROS; + } + + public static final boolean KERBEROS_DEFAULT_IMPERSONATION_ENABLE = false; + + public AuthenticationConfig(Properties properties, Configuration configuration) { + super(false); + loadFromHdfsConfiguration(configuration); + loadFromMap((Map) properties, k -> true); Review Comment: This raw type cast is unsafe and should be replaced with proper generic typing. The cast of Properties (which extends Hashtable<Object,Object>) to Map without proper type parameters can lead to ClassCastException at runtime if the properties contain non-String keys or values. ```suggestion // Convert Properties to Map<String, String> to ensure type safety Map<String, String> stringMap = new java.util.HashMap<>(); for (String name : properties.stringPropertyNames()) { stringMap.put(name, properties.getProperty(name)); } loadFromMap(stringMap, k -> true); ``` ########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.hive.kerberos; + +import static org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_TOKEN_SIGNATURE; +import static org.apache.gravitino.hive.kerberos.KerberosConfig.PRINCIPAL_KEY; + +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.hive.client.HiveClient; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KerberosClient implements java.io.Closeable { + private static final Logger LOG = LoggerFactory.getLogger(KerberosClient.class); + + private volatile ScheduledThreadPoolExecutor checkTgtExecutor; + private final Properties conf; + private final Configuration hadoopConf; + private final boolean refreshCredentials; + private volatile UserGroupInformation realLoginUgi; + private final String keytabFilePath; + private volatile HiveClient hiveClient = null; + + public KerberosClient( + Properties properties, + Configuration hadoopConf, + boolean refreshCredentials, + String keytabFilePath) + throws IOException { + this.conf = properties; + this.hadoopConf = hadoopConf; + this.refreshCredentials = refreshCredentials; + File keyTabFile = saveKeyTabFileFromUri(keytabFilePath); + this.keytabFilePath = keyTabFile.getAbsolutePath(); + } + + public UserGroupInformation loginProxyUser(String currentUser) { + try { + if (currentUser.equals(realLoginUgi.getUserName()) || hiveClient == null) { + return realLoginUgi; + } + + String tokenSignature = conf.getProperty(HIVE_METASTORE_TOKEN_SIGNATURE, ""); + String principal = conf.getProperty(PRINCIPAL_KEY, ""); + @SuppressWarnings("null") + List<String> principalComponents = Splitter.on('@').splitToList(principal); + Preconditions.checkArgument( + principalComponents.size() == 2, "The principal has the wrong format"); + String kerberosRealm = principalComponents.get(1); + + UserGroupInformation proxyUser; + final String finalPrincipalName; + if (!currentUser.contains("@")) { + finalPrincipalName = String.format("%s@%s", currentUser, kerberosRealm); + } else { + finalPrincipalName = currentUser; + } + + proxyUser = UserGroupInformation.createProxyUser(finalPrincipalName, realLoginUgi); + + // Acquire HMS delegation token for the proxy user and attach it to UGI + Preconditions.checkArgument(hiveClient != null, "HiveClient is not set in KerberosClient"); + String tokenStr = + hiveClient.getDelegationToken(finalPrincipalName, realLoginUgi.getUserName()); + + Token<DelegationTokenIdentifier> delegationToken = new Token<>(); + delegationToken.decodeFromUrlString(tokenStr); + delegationToken.setService(new Text(tokenSignature)); + proxyUser.addToken(delegationToken); + + return proxyUser; + } catch (Exception e) { + throw new RuntimeException("Failed to create proxy user for Kerberos Hive client", e); + } + } + + public UserGroupInformation login() throws Exception { + KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf); + + // Check the principal and keytab file + String catalogPrincipal = kerberosConfig.getPrincipalName(); + Preconditions.checkArgument( + StringUtils.isNotBlank(catalogPrincipal), "The principal can't be blank"); + List<String> principalComponents = Splitter.on('@').splitToList(catalogPrincipal); + Preconditions.checkArgument( + principalComponents.size() == 2, "The principal has the wrong format"); + + // Login + UserGroupInformation.setConfiguration(hadoopConf); + UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath); + UserGroupInformation loginUgi = UserGroupInformation.getLoginUser(); + realLoginUgi = loginUgi; + + // Refresh the cache if it's out of date. + if (refreshCredentials) { + if (checkTgtExecutor == null) { + checkTgtExecutor = new ScheduledThreadPoolExecutor(1, getThreadFactory("check-tgt")); + } + int checkInterval = kerberosConfig.getCheckIntervalSec(); + checkTgtExecutor.scheduleAtFixedRate( + () -> { + try { + loginUgi.checkTGTAndReloginFromKeytab(); + } catch (Exception e) { + LOG.error("Fail to refresh ugi token: ", e); + } + }, + checkInterval, + checkInterval, + TimeUnit.SECONDS); + } + + return loginUgi; + } + + public File saveKeyTabFileFromUri(String path) throws IOException { + KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf); + + String keyTabUri = kerberosConfig.getKeytab(); + Preconditions.checkArgument(StringUtils.isNotBlank(keyTabUri), "Keytab uri can't be blank"); + Preconditions.checkArgument( + !keyTabUri.trim().startsWith("hdfs"), "Keytab uri doesn't support to use HDFS"); + + File keytabsDir = new File("keytabs"); + if (!keytabsDir.exists()) { + keytabsDir.mkdir(); + } + File keytabFile = new File(path); + keytabFile.deleteOnExit(); + if (keytabFile.exists() && !keytabFile.delete()) { + throw new IllegalStateException( + String.format("Fail to delete keytab file %s", keytabFile.getAbsolutePath())); + } + int fetchKeytabFileTimeout = kerberosConfig.getFetchTimeoutSec(); + FetchFileUtils.fetchFileFromUri(keyTabUri, keytabFile, fetchKeytabFileTimeout, hadoopConf); + return keytabFile; + } + + private static ThreadFactory getThreadFactory(String factoryName) { + return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(factoryName + "-%d").build(); + } + + @Override + public void close() { + if (checkTgtExecutor != null) { + checkTgtExecutor.shutdown(); + } Review Comment: The close() method does not clean up the keytab file that was created in saveKeyTabFileFromUri(). This can lead to keytab files accumulating in the keytabs directory over time. The keytabFilePath field should be tracked and the file should be explicitly deleted during cleanup. ```suggestion } if (keytabFilePath != null) { File keytabFile = new File(keytabFilePath); if (keytabFile.exists() && !keytabFile.delete()) { LOG.warn("Failed to delete keytab file: {}", keytabFilePath); } } ``` ########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/ProxyHiveClientImpl.java: ########## @@ -18,12 +18,80 @@ */ package org.apache.gravitino.hive.client; +import java.io.IOException; import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.lang.reflect.UndeclaredThrowableException; +import java.security.PrivilegedExceptionAction; +import java.util.Properties; +import org.apache.hadoop.security.UserGroupInformation; public class ProxyHiveClientImpl implements InvocationHandler { + + private final HiveClient delegate; + private final UserGroupInformation ugi; + + private ProxyHiveClientImpl(HiveClient delegate, UserGroupInformation ugi) { + this.delegate = delegate; + this.ugi = ugi; + } + + /** + * Wraps a {@link HiveClient} so that all its methods are executed via {@link + * UserGroupInformation#doAs(PrivilegedExceptionAction)} of the current user. + * + * <p>Callers should ensure Kerberos has been configured and the login user is set appropriately + * (for example via keytab) before calling this method. + */ + public static HiveClient createClient( + HiveClientClassLoader.HiveVersion version, UserGroupInformation ugi, Properties properties) { + try { + HiveClient client = + ugi.doAs( + (PrivilegedExceptionAction<HiveClient>) + () -> + HiveClientFactory.createHiveClientImpl( + version, properties, Thread.currentThread().getContextClassLoader())); + return (HiveClient) + Proxy.newProxyInstance( + HiveClient.class.getClassLoader(), + new Class<?>[] {HiveClient.class}, + new ProxyHiveClientImpl(client, ugi)); + + } catch (IOException | InterruptedException ex) { + throw new RuntimeException("Failed to create Kerberos Hive client", ex); + } + } + @Override - public Object invoke(Object o, Method method, Object[] objects) throws Throwable { - return null; + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + try { + return ugi.doAs( + (PrivilegedExceptionAction<Object>) + () -> { + try { + return method.invoke(delegate, args); + } catch (InvocationTargetException ite) { + Throwable cause = ite.getCause(); + if (cause instanceof Exception) { + throw (Exception) cause; + } + throw new RuntimeException(cause != null ? cause : ite); + } Review Comment: The exception handling here catches InvocationTargetException and unwraps the cause, but doesn't handle the case where the cause might be an Error (as opposed to Exception). If the delegate method throws an Error, it will be wrapped in RuntimeException at line 81, which masks critical errors like OutOfMemoryError or StackOverflowError that should typically be propagated as-is. Consider adding a check to rethrow Errors directly. ########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosConfig.java: ########## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.hive.kerberos; + +import java.util.Map; +import java.util.Properties; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.config.ConfigBuilder; +import org.apache.gravitino.config.ConfigConstants; +import org.apache.gravitino.config.ConfigEntry; +import org.apache.hadoop.conf.Configuration; + +public class KerberosConfig extends AuthenticationConfig { + public static final String KEY_TAB_URI_KEY = "authentication.kerberos.keytab-uri"; + + public static final String PRINCIPAL_KEY = "authentication.kerberos.principal"; + + public static final String CHECK_INTERVAL_SEC_KEY = "authentication.kerberos.check-interval-sec"; + + public static final String FETCH_TIMEOUT_SEC_KEY = + "authentication.kerberos.keytab-fetch-timeout-sec"; + + public static final ConfigEntry<String> PRINCIPAL_ENTRY = + new ConfigBuilder(PRINCIPAL_KEY) + .doc("The principal of the Kerberos connection") + .version(ConfigConstants.VERSION_1_0_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + + public static final ConfigEntry<String> KEYTAB_ENTRY = + new ConfigBuilder(KEY_TAB_URI_KEY) + .doc("The keytab of the Kerberos connection") + .version(ConfigConstants.VERSION_1_0_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + + public static final ConfigEntry<Integer> CHECK_INTERVAL_SEC_ENTRY = + new ConfigBuilder(CHECK_INTERVAL_SEC_KEY) + .doc("The check interval of the Kerberos connection for Hudi catalog") + .version(ConfigConstants.VERSION_1_0_0) + .intConf() + .checkValue(value -> value > 0, ConfigConstants.POSITIVE_NUMBER_ERROR_MSG) + .createWithDefault(60); + + public static final ConfigEntry<Integer> FETCH_TIMEOUT_SEC_ENTRY = + new ConfigBuilder(FETCH_TIMEOUT_SEC_KEY) + .doc("The fetch timeout of the Kerberos connection") + .version(ConfigConstants.VERSION_1_0_0) + .intConf() + .checkValue(value -> value > 0, ConfigConstants.POSITIVE_NUMBER_ERROR_MSG) + .createWithDefault(60); + + public KerberosConfig(Properties properties, Configuration configuration) { + super(properties, configuration); + loadFromMap((Map) properties, k -> true); Review Comment: This raw type cast is unsafe and should be replaced with proper generic typing. The cast of Properties (which extends Hashtable<Object,Object>) to Map without proper type parameters can lead to ClassCastException at runtime if the properties contain non-String keys or values. ```suggestion Map<String, String> stringMap = new java.util.HashMap<>(); for (String name : properties.stringPropertyNames()) { stringMap.put(name, properties.getProperty(name)); } loadFromMap(stringMap, k -> true); ``` ########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientFactory.java: ########## @@ -56,10 +64,18 @@ public final class HiveClientFactory { public HiveClientFactory(Properties properties, String id) { Preconditions.checkArgument(properties != null, "Properties cannot be null"); this.properties = properties; + this.keytabPath = String.format("keytabs/gravitino-%s-keytab", id); try { this.hadoopConf = new Configuration(); updateConfigurationFromProperties(properties, hadoopConf); + + initKerberosIfNecessary(); + if (enableKerberos) { + // set hive client to kerberos client for retrieving delegation token + HiveClient client = createHiveClient(); + kerberosClient.setHiveClient(client); + } Review Comment: There's a potential circular dependency issue during initialization. The constructor calls initKerberosIfNecessary() which calls createHiveClient() to set up the kerberosClient.hiveClient. However, createHiveClient() can potentially try to use the kerberosClient (if enableImpersonation is true) before it's fully initialized. This could lead to null pointer exceptions or incomplete initialization. The initialization sequence should be restructured to avoid this circular dependency. ```suggestion // Removed setting HiveClient on KerberosClient here to avoid circular dependency. // The HiveClient should be set on the KerberosClient after full initialization. ``` ########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.hive.kerberos; + +import static org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_TOKEN_SIGNATURE; +import static org.apache.gravitino.hive.kerberos.KerberosConfig.PRINCIPAL_KEY; + +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.hive.client.HiveClient; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KerberosClient implements java.io.Closeable { + private static final Logger LOG = LoggerFactory.getLogger(KerberosClient.class); + + private volatile ScheduledThreadPoolExecutor checkTgtExecutor; + private final Properties conf; + private final Configuration hadoopConf; + private final boolean refreshCredentials; + private volatile UserGroupInformation realLoginUgi; + private final String keytabFilePath; + private volatile HiveClient hiveClient = null; + + public KerberosClient( + Properties properties, + Configuration hadoopConf, + boolean refreshCredentials, + String keytabFilePath) + throws IOException { + this.conf = properties; + this.hadoopConf = hadoopConf; + this.refreshCredentials = refreshCredentials; + File keyTabFile = saveKeyTabFileFromUri(keytabFilePath); + this.keytabFilePath = keyTabFile.getAbsolutePath(); + } + + public UserGroupInformation loginProxyUser(String currentUser) { + try { + if (currentUser.equals(realLoginUgi.getUserName()) || hiveClient == null) { + return realLoginUgi; + } + + String tokenSignature = conf.getProperty(HIVE_METASTORE_TOKEN_SIGNATURE, ""); + String principal = conf.getProperty(PRINCIPAL_KEY, ""); + @SuppressWarnings("null") + List<String> principalComponents = Splitter.on('@').splitToList(principal); + Preconditions.checkArgument( + principalComponents.size() == 2, "The principal has the wrong format"); + String kerberosRealm = principalComponents.get(1); + + UserGroupInformation proxyUser; + final String finalPrincipalName; + if (!currentUser.contains("@")) { + finalPrincipalName = String.format("%s@%s", currentUser, kerberosRealm); + } else { + finalPrincipalName = currentUser; + } + + proxyUser = UserGroupInformation.createProxyUser(finalPrincipalName, realLoginUgi); + + // Acquire HMS delegation token for the proxy user and attach it to UGI + Preconditions.checkArgument(hiveClient != null, "HiveClient is not set in KerberosClient"); + String tokenStr = + hiveClient.getDelegationToken(finalPrincipalName, realLoginUgi.getUserName()); + + Token<DelegationTokenIdentifier> delegationToken = new Token<>(); + delegationToken.decodeFromUrlString(tokenStr); + delegationToken.setService(new Text(tokenSignature)); + proxyUser.addToken(delegationToken); + + return proxyUser; + } catch (Exception e) { + throw new RuntimeException("Failed to create proxy user for Kerberos Hive client", e); + } + } + + public UserGroupInformation login() throws Exception { + KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf); + + // Check the principal and keytab file + String catalogPrincipal = kerberosConfig.getPrincipalName(); + Preconditions.checkArgument( + StringUtils.isNotBlank(catalogPrincipal), "The principal can't be blank"); + List<String> principalComponents = Splitter.on('@').splitToList(catalogPrincipal); + Preconditions.checkArgument( + principalComponents.size() == 2, "The principal has the wrong format"); + + // Login + UserGroupInformation.setConfiguration(hadoopConf); + UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath); + UserGroupInformation loginUgi = UserGroupInformation.getLoginUser(); + realLoginUgi = loginUgi; Review Comment: The realLoginUgi field is declared as volatile but is accessed and modified without synchronization in both login() (line 124) and loginProxyUser() (line 71-72). While volatile ensures visibility, it doesn't provide atomicity for compound operations. The loginProxyUser() method reads realLoginUgi multiple times, and if login() is called concurrently from another thread, it could see an inconsistent state. Consider using synchronization or ensure these methods are never called concurrently. ########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.hive.kerberos; + +import static org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_TOKEN_SIGNATURE; +import static org.apache.gravitino.hive.kerberos.KerberosConfig.PRINCIPAL_KEY; + +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.hive.client.HiveClient; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KerberosClient implements java.io.Closeable { + private static final Logger LOG = LoggerFactory.getLogger(KerberosClient.class); + + private volatile ScheduledThreadPoolExecutor checkTgtExecutor; + private final Properties conf; + private final Configuration hadoopConf; + private final boolean refreshCredentials; + private volatile UserGroupInformation realLoginUgi; + private final String keytabFilePath; + private volatile HiveClient hiveClient = null; + + public KerberosClient( + Properties properties, + Configuration hadoopConf, + boolean refreshCredentials, + String keytabFilePath) + throws IOException { + this.conf = properties; + this.hadoopConf = hadoopConf; + this.refreshCredentials = refreshCredentials; + File keyTabFile = saveKeyTabFileFromUri(keytabFilePath); + this.keytabFilePath = keyTabFile.getAbsolutePath(); + } + + public UserGroupInformation loginProxyUser(String currentUser) { + try { + if (currentUser.equals(realLoginUgi.getUserName()) || hiveClient == null) { + return realLoginUgi; + } + + String tokenSignature = conf.getProperty(HIVE_METASTORE_TOKEN_SIGNATURE, ""); + String principal = conf.getProperty(PRINCIPAL_KEY, ""); + @SuppressWarnings("null") + List<String> principalComponents = Splitter.on('@').splitToList(principal); + Preconditions.checkArgument( + principalComponents.size() == 2, "The principal has the wrong format"); + String kerberosRealm = principalComponents.get(1); + + UserGroupInformation proxyUser; + final String finalPrincipalName; + if (!currentUser.contains("@")) { + finalPrincipalName = String.format("%s@%s", currentUser, kerberosRealm); + } else { + finalPrincipalName = currentUser; + } + + proxyUser = UserGroupInformation.createProxyUser(finalPrincipalName, realLoginUgi); + + // Acquire HMS delegation token for the proxy user and attach it to UGI + Preconditions.checkArgument(hiveClient != null, "HiveClient is not set in KerberosClient"); + String tokenStr = + hiveClient.getDelegationToken(finalPrincipalName, realLoginUgi.getUserName()); + + Token<DelegationTokenIdentifier> delegationToken = new Token<>(); + delegationToken.decodeFromUrlString(tokenStr); + delegationToken.setService(new Text(tokenSignature)); + proxyUser.addToken(delegationToken); + + return proxyUser; + } catch (Exception e) { + throw new RuntimeException("Failed to create proxy user for Kerberos Hive client", e); + } + } + + public UserGroupInformation login() throws Exception { + KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf); + + // Check the principal and keytab file + String catalogPrincipal = kerberosConfig.getPrincipalName(); + Preconditions.checkArgument( + StringUtils.isNotBlank(catalogPrincipal), "The principal can't be blank"); + List<String> principalComponents = Splitter.on('@').splitToList(catalogPrincipal); + Preconditions.checkArgument( + principalComponents.size() == 2, "The principal has the wrong format"); + + // Login + UserGroupInformation.setConfiguration(hadoopConf); + UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath); + UserGroupInformation loginUgi = UserGroupInformation.getLoginUser(); + realLoginUgi = loginUgi; + + // Refresh the cache if it's out of date. + if (refreshCredentials) { + if (checkTgtExecutor == null) { + checkTgtExecutor = new ScheduledThreadPoolExecutor(1, getThreadFactory("check-tgt")); + } + int checkInterval = kerberosConfig.getCheckIntervalSec(); + checkTgtExecutor.scheduleAtFixedRate( + () -> { + try { + loginUgi.checkTGTAndReloginFromKeytab(); + } catch (Exception e) { + LOG.error("Fail to refresh ugi token: ", e); + } + }, + checkInterval, + checkInterval, + TimeUnit.SECONDS); + } + + return loginUgi; + } + + public File saveKeyTabFileFromUri(String path) throws IOException { + KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf); + + String keyTabUri = kerberosConfig.getKeytab(); + Preconditions.checkArgument(StringUtils.isNotBlank(keyTabUri), "Keytab uri can't be blank"); + Preconditions.checkArgument( + !keyTabUri.trim().startsWith("hdfs"), "Keytab uri doesn't support to use HDFS"); + + File keytabsDir = new File("keytabs"); + if (!keytabsDir.exists()) { + keytabsDir.mkdir(); + } + File keytabFile = new File(path); + keytabFile.deleteOnExit(); + if (keytabFile.exists() && !keytabFile.delete()) { + throw new IllegalStateException( + String.format("Fail to delete keytab file %s", keytabFile.getAbsolutePath())); + } + int fetchKeytabFileTimeout = kerberosConfig.getFetchTimeoutSec(); + FetchFileUtils.fetchFileFromUri(keyTabUri, keytabFile, fetchKeytabFileTimeout, hadoopConf); + return keytabFile; Review Comment: The keytab file should be created with restricted permissions (e.g., 0600) to prevent unauthorized access to sensitive Kerberos credentials. Currently, the file is created with default permissions which may be too permissive. Use Files.setPosixFilePermissions() or similar to restrict access to the owner only. ########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/AuthenticationConfig.java: ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.hive.kerberos; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; + +import java.util.Map; +import java.util.Properties; +import org.apache.gravitino.Config; +import org.apache.gravitino.config.ConfigBuilder; +import org.apache.gravitino.config.ConfigConstants; +import org.apache.gravitino.config.ConfigEntry; +import org.apache.hadoop.conf.Configuration; + +public class AuthenticationConfig extends Config { + + // The key for the authentication type, currently we support Kerberos and simple + public static final String AUTH_TYPE_KEY = "authentication.type"; + + public static final String IMPERSONATION_ENABLE_KEY = "authentication.impersonation-enable"; + + enum AuthenticationType { + SIMPLE, + KERBEROS; + } + + public static final boolean KERBEROS_DEFAULT_IMPERSONATION_ENABLE = false; + + public AuthenticationConfig(Properties properties, Configuration configuration) { + super(false); + loadFromHdfsConfiguration(configuration); + loadFromMap((Map) properties, k -> true); + } + + private void loadFromHdfsConfiguration(Configuration configuration) { + String authType = configuration.get(HADOOP_SECURITY_AUTHENTICATION, "simple"); + configMap.put(AUTH_TYPE_KEY, authType); + } + + public static final ConfigEntry<String> AUTH_TYPE_ENTRY = + new ConfigBuilder(AUTH_TYPE_KEY) + .doc( + "The type of authentication for Hudi catalog, currently we only support simple and Kerberos") + .version(ConfigConstants.VERSION_1_0_0) + .stringConf() + .createWithDefault("simple"); + + public static final ConfigEntry<Boolean> ENABLE_IMPERSONATION_ENTRY = + new ConfigBuilder(IMPERSONATION_ENABLE_KEY) + .doc("Whether to enable impersonation for the Hudi catalog") Review Comment: The documentation mentions "Hudi catalog" but this configuration is for the Hive catalog. This appears to be a copy-paste error from another catalog implementation. ```suggestion "The type of authentication for Hive catalog, currently we only support simple and Kerberos") .version(ConfigConstants.VERSION_1_0_0) .stringConf() .createWithDefault("simple"); public static final ConfigEntry<Boolean> ENABLE_IMPERSONATION_ENTRY = new ConfigBuilder(IMPERSONATION_ENABLE_KEY) .doc("Whether to enable impersonation for the Hive catalog") ``` ########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/AuthenticationConfig.java: ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.hive.kerberos; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; + +import java.util.Map; +import java.util.Properties; +import org.apache.gravitino.Config; +import org.apache.gravitino.config.ConfigBuilder; +import org.apache.gravitino.config.ConfigConstants; +import org.apache.gravitino.config.ConfigEntry; +import org.apache.hadoop.conf.Configuration; + +public class AuthenticationConfig extends Config { + + // The key for the authentication type, currently we support Kerberos and simple + public static final String AUTH_TYPE_KEY = "authentication.type"; + + public static final String IMPERSONATION_ENABLE_KEY = "authentication.impersonation-enable"; + + enum AuthenticationType { + SIMPLE, + KERBEROS; + } + + public static final boolean KERBEROS_DEFAULT_IMPERSONATION_ENABLE = false; + + public AuthenticationConfig(Properties properties, Configuration configuration) { + super(false); + loadFromHdfsConfiguration(configuration); + loadFromMap((Map) properties, k -> true); + } + + private void loadFromHdfsConfiguration(Configuration configuration) { + String authType = configuration.get(HADOOP_SECURITY_AUTHENTICATION, "simple"); + configMap.put(AUTH_TYPE_KEY, authType); + } + + public static final ConfigEntry<String> AUTH_TYPE_ENTRY = + new ConfigBuilder(AUTH_TYPE_KEY) + .doc( + "The type of authentication for Hudi catalog, currently we only support simple and Kerberos") + .version(ConfigConstants.VERSION_1_0_0) + .stringConf() + .createWithDefault("simple"); + + public static final ConfigEntry<Boolean> ENABLE_IMPERSONATION_ENTRY = + new ConfigBuilder(IMPERSONATION_ENABLE_KEY) + .doc("Whether to enable impersonation for the Hudi catalog") Review Comment: The documentation mentions "Hudi catalog" but this configuration is for the Hive catalog. This appears to be a copy-paste error from another catalog implementation. ```suggestion .doc("Whether to enable impersonation for the Hive catalog") ``` ########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.hive.kerberos; + +import static org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_TOKEN_SIGNATURE; +import static org.apache.gravitino.hive.kerberos.KerberosConfig.PRINCIPAL_KEY; + +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.hive.client.HiveClient; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KerberosClient implements java.io.Closeable { + private static final Logger LOG = LoggerFactory.getLogger(KerberosClient.class); + + private volatile ScheduledThreadPoolExecutor checkTgtExecutor; + private final Properties conf; + private final Configuration hadoopConf; + private final boolean refreshCredentials; + private volatile UserGroupInformation realLoginUgi; + private final String keytabFilePath; + private volatile HiveClient hiveClient = null; + + public KerberosClient( + Properties properties, + Configuration hadoopConf, + boolean refreshCredentials, + String keytabFilePath) + throws IOException { + this.conf = properties; + this.hadoopConf = hadoopConf; + this.refreshCredentials = refreshCredentials; + File keyTabFile = saveKeyTabFileFromUri(keytabFilePath); + this.keytabFilePath = keyTabFile.getAbsolutePath(); + } + + public UserGroupInformation loginProxyUser(String currentUser) { + try { + if (currentUser.equals(realLoginUgi.getUserName()) || hiveClient == null) { + return realLoginUgi; + } + + String tokenSignature = conf.getProperty(HIVE_METASTORE_TOKEN_SIGNATURE, ""); + String principal = conf.getProperty(PRINCIPAL_KEY, ""); + @SuppressWarnings("null") + List<String> principalComponents = Splitter.on('@').splitToList(principal); + Preconditions.checkArgument( + principalComponents.size() == 2, "The principal has the wrong format"); + String kerberosRealm = principalComponents.get(1); + + UserGroupInformation proxyUser; + final String finalPrincipalName; + if (!currentUser.contains("@")) { + finalPrincipalName = String.format("%s@%s", currentUser, kerberosRealm); + } else { + finalPrincipalName = currentUser; + } + + proxyUser = UserGroupInformation.createProxyUser(finalPrincipalName, realLoginUgi); + + // Acquire HMS delegation token for the proxy user and attach it to UGI + Preconditions.checkArgument(hiveClient != null, "HiveClient is not set in KerberosClient"); + String tokenStr = + hiveClient.getDelegationToken(finalPrincipalName, realLoginUgi.getUserName()); + + Token<DelegationTokenIdentifier> delegationToken = new Token<>(); + delegationToken.decodeFromUrlString(tokenStr); + delegationToken.setService(new Text(tokenSignature)); + proxyUser.addToken(delegationToken); + + return proxyUser; + } catch (Exception e) { + throw new RuntimeException("Failed to create proxy user for Kerberos Hive client", e); + } + } + + public UserGroupInformation login() throws Exception { + KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf); + + // Check the principal and keytab file + String catalogPrincipal = kerberosConfig.getPrincipalName(); + Preconditions.checkArgument( + StringUtils.isNotBlank(catalogPrincipal), "The principal can't be blank"); + List<String> principalComponents = Splitter.on('@').splitToList(catalogPrincipal); + Preconditions.checkArgument( + principalComponents.size() == 2, "The principal has the wrong format"); + + // Login + UserGroupInformation.setConfiguration(hadoopConf); + UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath); + UserGroupInformation loginUgi = UserGroupInformation.getLoginUser(); + realLoginUgi = loginUgi; + + // Refresh the cache if it's out of date. + if (refreshCredentials) { + if (checkTgtExecutor == null) { + checkTgtExecutor = new ScheduledThreadPoolExecutor(1, getThreadFactory("check-tgt")); + } + int checkInterval = kerberosConfig.getCheckIntervalSec(); + checkTgtExecutor.scheduleAtFixedRate( + () -> { + try { + loginUgi.checkTGTAndReloginFromKeytab(); + } catch (Exception e) { + LOG.error("Fail to refresh ugi token: ", e); + } + }, + checkInterval, + checkInterval, + TimeUnit.SECONDS); + } + + return loginUgi; + } + + public File saveKeyTabFileFromUri(String path) throws IOException { + KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf); + + String keyTabUri = kerberosConfig.getKeytab(); + Preconditions.checkArgument(StringUtils.isNotBlank(keyTabUri), "Keytab uri can't be blank"); + Preconditions.checkArgument( + !keyTabUri.trim().startsWith("hdfs"), "Keytab uri doesn't support to use HDFS"); + + File keytabsDir = new File("keytabs"); + if (!keytabsDir.exists()) { + keytabsDir.mkdir(); + } + File keytabFile = new File(path); + keytabFile.deleteOnExit(); + if (keytabFile.exists() && !keytabFile.delete()) { + throw new IllegalStateException( + String.format("Fail to delete keytab file %s", keytabFile.getAbsolutePath())); + } + int fetchKeytabFileTimeout = kerberosConfig.getFetchTimeoutSec(); + FetchFileUtils.fetchFileFromUri(keyTabUri, keytabFile, fetchKeytabFileTimeout, hadoopConf); + return keytabFile; + } Review Comment: The public methods loginProxyUser(), login(), and saveKeyTabFileFromUri() lack Javadoc documentation. These are critical security-related methods that should have comprehensive documentation describing their purpose, parameters, return values, exceptions, and any thread-safety guarantees or usage constraints. ########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.hive.kerberos; + +import static org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_TOKEN_SIGNATURE; +import static org.apache.gravitino.hive.kerberos.KerberosConfig.PRINCIPAL_KEY; + +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.hive.client.HiveClient; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KerberosClient implements java.io.Closeable { + private static final Logger LOG = LoggerFactory.getLogger(KerberosClient.class); + + private volatile ScheduledThreadPoolExecutor checkTgtExecutor; + private final Properties conf; + private final Configuration hadoopConf; + private final boolean refreshCredentials; + private volatile UserGroupInformation realLoginUgi; + private final String keytabFilePath; + private volatile HiveClient hiveClient = null; + + public KerberosClient( + Properties properties, + Configuration hadoopConf, + boolean refreshCredentials, + String keytabFilePath) + throws IOException { + this.conf = properties; + this.hadoopConf = hadoopConf; + this.refreshCredentials = refreshCredentials; + File keyTabFile = saveKeyTabFileFromUri(keytabFilePath); + this.keytabFilePath = keyTabFile.getAbsolutePath(); + } + + public UserGroupInformation loginProxyUser(String currentUser) { + try { + if (currentUser.equals(realLoginUgi.getUserName()) || hiveClient == null) { + return realLoginUgi; + } + + String tokenSignature = conf.getProperty(HIVE_METASTORE_TOKEN_SIGNATURE, ""); + String principal = conf.getProperty(PRINCIPAL_KEY, ""); + @SuppressWarnings("null") Review Comment: The @SuppressWarnings("null") annotation is misleading here. The Splitter.on('@').splitToList() method is guaranteed to never return null - it always returns a non-null list (though it might be empty or have fewer elements than expected). The real issue being suppressed is likely a different warning. This annotation should be removed or clarified with the actual warning being suppressed. ```suggestion ``` ########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/FetchFileUtils.java: ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.hive.kerberos; + +import java.io.File; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public class FetchFileUtils { + + private FetchFileUtils() {} + + public static void fetchFileFromUri( + String fileUri, File destFile, int timeout, Configuration conf) throws java.io.IOException { + try { + URI uri = new URI(fileUri); + String scheme = java.util.Optional.ofNullable(uri.getScheme()).orElse("file"); + + switch (scheme) { + case "http": + case "https": + case "ftp": + FileUtils.copyURLToFile(uri.toURL(), destFile, timeout * 1000, timeout * 1000); + break; + + case "file": + Files.createSymbolicLink(destFile.toPath(), new File(uri.getPath()).toPath()); + break; + + case "hdfs": Review Comment: There's a potential path traversal vulnerability here. The URI path is used directly without validation, which could allow an attacker to access files outside the intended directory by using path traversal sequences like "../". Input validation should be added to ensure the resolved path stays within expected boundaries. ```suggestion case "hdfs": // Prevent path traversal: ensure destFile is within its parent directory File destCanonical = destFile.getCanonicalFile(); File parentCanonical = destFile.getParentFile().getCanonicalFile(); if (!destCanonical.getPath().startsWith(parentCanonical.getPath() + File.separator)) { throw new SecurityException("Path traversal attempt detected: destination file is outside the allowed directory."); } ``` ########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.hive.kerberos; + +import static org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_TOKEN_SIGNATURE; +import static org.apache.gravitino.hive.kerberos.KerberosConfig.PRINCIPAL_KEY; + +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.hive.client.HiveClient; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KerberosClient implements java.io.Closeable { + private static final Logger LOG = LoggerFactory.getLogger(KerberosClient.class); + + private volatile ScheduledThreadPoolExecutor checkTgtExecutor; + private final Properties conf; + private final Configuration hadoopConf; + private final boolean refreshCredentials; + private volatile UserGroupInformation realLoginUgi; + private final String keytabFilePath; + private volatile HiveClient hiveClient = null; + + public KerberosClient( + Properties properties, + Configuration hadoopConf, + boolean refreshCredentials, + String keytabFilePath) + throws IOException { + this.conf = properties; + this.hadoopConf = hadoopConf; + this.refreshCredentials = refreshCredentials; + File keyTabFile = saveKeyTabFileFromUri(keytabFilePath); + this.keytabFilePath = keyTabFile.getAbsolutePath(); + } + + public UserGroupInformation loginProxyUser(String currentUser) { + try { + if (currentUser.equals(realLoginUgi.getUserName()) || hiveClient == null) { + return realLoginUgi; + } Review Comment: There's a logic error in the condition on line 71. The method returns realLoginUgi if either currentUser equals the realLoginUgi username OR if hiveClient is null. However, when hiveClient is null, user impersonation cannot work at all (as evidenced by the later getDelegationToken call), so this should be treated as an error condition rather than silently returning realLoginUgi. This could mask configuration or initialization issues. ```suggestion if (currentUser.equals(realLoginUgi.getUserName())) { return realLoginUgi; } Preconditions.checkState(hiveClient != null, "HiveClient is not set in KerberosClient"); ``` ########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.hive.kerberos; + +import static org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_TOKEN_SIGNATURE; +import static org.apache.gravitino.hive.kerberos.KerberosConfig.PRINCIPAL_KEY; + +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.hive.client.HiveClient; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KerberosClient implements java.io.Closeable { + private static final Logger LOG = LoggerFactory.getLogger(KerberosClient.class); + + private volatile ScheduledThreadPoolExecutor checkTgtExecutor; + private final Properties conf; + private final Configuration hadoopConf; + private final boolean refreshCredentials; + private volatile UserGroupInformation realLoginUgi; + private final String keytabFilePath; + private volatile HiveClient hiveClient = null; + + public KerberosClient( + Properties properties, + Configuration hadoopConf, + boolean refreshCredentials, + String keytabFilePath) + throws IOException { + this.conf = properties; + this.hadoopConf = hadoopConf; + this.refreshCredentials = refreshCredentials; + File keyTabFile = saveKeyTabFileFromUri(keytabFilePath); + this.keytabFilePath = keyTabFile.getAbsolutePath(); + } + + public UserGroupInformation loginProxyUser(String currentUser) { + try { + if (currentUser.equals(realLoginUgi.getUserName()) || hiveClient == null) { + return realLoginUgi; + } + + String tokenSignature = conf.getProperty(HIVE_METASTORE_TOKEN_SIGNATURE, ""); + String principal = conf.getProperty(PRINCIPAL_KEY, ""); + @SuppressWarnings("null") + List<String> principalComponents = Splitter.on('@').splitToList(principal); + Preconditions.checkArgument( + principalComponents.size() == 2, "The principal has the wrong format"); + String kerberosRealm = principalComponents.get(1); + + UserGroupInformation proxyUser; + final String finalPrincipalName; + if (!currentUser.contains("@")) { + finalPrincipalName = String.format("%s@%s", currentUser, kerberosRealm); + } else { + finalPrincipalName = currentUser; + } + + proxyUser = UserGroupInformation.createProxyUser(finalPrincipalName, realLoginUgi); + + // Acquire HMS delegation token for the proxy user and attach it to UGI + Preconditions.checkArgument(hiveClient != null, "HiveClient is not set in KerberosClient"); + String tokenStr = + hiveClient.getDelegationToken(finalPrincipalName, realLoginUgi.getUserName()); + + Token<DelegationTokenIdentifier> delegationToken = new Token<>(); + delegationToken.decodeFromUrlString(tokenStr); + delegationToken.setService(new Text(tokenSignature)); + proxyUser.addToken(delegationToken); + + return proxyUser; + } catch (Exception e) { + throw new RuntimeException("Failed to create proxy user for Kerberos Hive client", e); + } + } + + public UserGroupInformation login() throws Exception { + KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf); + + // Check the principal and keytab file + String catalogPrincipal = kerberosConfig.getPrincipalName(); + Preconditions.checkArgument( + StringUtils.isNotBlank(catalogPrincipal), "The principal can't be blank"); + List<String> principalComponents = Splitter.on('@').splitToList(catalogPrincipal); + Preconditions.checkArgument( + principalComponents.size() == 2, "The principal has the wrong format"); + + // Login + UserGroupInformation.setConfiguration(hadoopConf); + UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath); + UserGroupInformation loginUgi = UserGroupInformation.getLoginUser(); + realLoginUgi = loginUgi; + + // Refresh the cache if it's out of date. + if (refreshCredentials) { + if (checkTgtExecutor == null) { + checkTgtExecutor = new ScheduledThreadPoolExecutor(1, getThreadFactory("check-tgt")); + } + int checkInterval = kerberosConfig.getCheckIntervalSec(); + checkTgtExecutor.scheduleAtFixedRate( + () -> { + try { + loginUgi.checkTGTAndReloginFromKeytab(); + } catch (Exception e) { + LOG.error("Fail to refresh ugi token: ", e); + } + }, + checkInterval, + checkInterval, + TimeUnit.SECONDS); + } + + return loginUgi; + } + + public File saveKeyTabFileFromUri(String path) throws IOException { + KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf); + + String keyTabUri = kerberosConfig.getKeytab(); + Preconditions.checkArgument(StringUtils.isNotBlank(keyTabUri), "Keytab uri can't be blank"); + Preconditions.checkArgument( + !keyTabUri.trim().startsWith("hdfs"), "Keytab uri doesn't support to use HDFS"); + + File keytabsDir = new File("keytabs"); + if (!keytabsDir.exists()) { + keytabsDir.mkdir(); + } + File keytabFile = new File(path); + keytabFile.deleteOnExit(); Review Comment: The method creates a keytab file but marks it for deletion on JVM exit with deleteOnExit(). This is problematic because: 1) The file needs to persist for the lifetime of the Kerberos client to support token refresh operations, 2) deleteOnExit() can cause memory leaks in long-running applications as the file paths accumulate in memory, and 3) The file may not be deleted if the JVM crashes. Consider implementing explicit cleanup in the close() method instead. ########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosConfig.java: ########## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.hive.kerberos; + +import java.util.Map; +import java.util.Properties; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.config.ConfigBuilder; +import org.apache.gravitino.config.ConfigConstants; +import org.apache.gravitino.config.ConfigEntry; +import org.apache.hadoop.conf.Configuration; + +public class KerberosConfig extends AuthenticationConfig { + public static final String KEY_TAB_URI_KEY = "authentication.kerberos.keytab-uri"; Review Comment: The AuthenticationConfig and KerberosConfig classes are missing class-level Javadoc documentation. As public configuration classes that define the contract for Kerberos authentication settings, they should have comprehensive documentation explaining their purpose, usage, and relationship to other components. ```suggestion /** * Configuration class for Kerberos authentication settings. * <p> * This class defines the contract for configuring Kerberos authentication in Hive Metastore integrations. * It provides configuration keys and accessors for essential Kerberos parameters such as principal, * keytab location, check interval, and fetch timeout. These settings are used to establish and maintain * secure connections to services protected by Kerberos. * <p> * {@code KerberosConfig} extends {@link AuthenticationConfig}, inheriting general authentication * configuration capabilities and specializing them for Kerberos-specific requirements. * <p> * Typical usage involves constructing an instance with the relevant {@link java.util.Properties} * and Hadoop {@link org.apache.hadoop.conf.Configuration}, after which the configuration values * can be accessed via the provided getter methods. * * <p>Key configuration entries: * <ul> * <li>{@code authentication.kerberos.principal} - The Kerberos principal name.</li> * <li>{@code authentication.kerberos.keytab-uri} - The URI to the keytab file.</li> * <li>{@code authentication.kerberos.check-interval-sec} - Interval (in seconds) to check ticket validity.</li> * <li>{@code authentication.kerberos.keytab-fetch-timeout-sec} - Timeout (in seconds) for fetching the keytab.</li> * </ul> * * @see AuthenticationConfig */ public class KerberosConfig extends AuthenticationConfig { ``` ########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.hive.kerberos; + +import static org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_TOKEN_SIGNATURE; +import static org.apache.gravitino.hive.kerberos.KerberosConfig.PRINCIPAL_KEY; + +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.hive.client.HiveClient; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KerberosClient implements java.io.Closeable { + private static final Logger LOG = LoggerFactory.getLogger(KerberosClient.class); + + private volatile ScheduledThreadPoolExecutor checkTgtExecutor; + private final Properties conf; + private final Configuration hadoopConf; + private final boolean refreshCredentials; + private volatile UserGroupInformation realLoginUgi; + private final String keytabFilePath; + private volatile HiveClient hiveClient = null; + + public KerberosClient( + Properties properties, + Configuration hadoopConf, + boolean refreshCredentials, + String keytabFilePath) + throws IOException { + this.conf = properties; + this.hadoopConf = hadoopConf; + this.refreshCredentials = refreshCredentials; + File keyTabFile = saveKeyTabFileFromUri(keytabFilePath); + this.keytabFilePath = keyTabFile.getAbsolutePath(); + } + + public UserGroupInformation loginProxyUser(String currentUser) { + try { + if (currentUser.equals(realLoginUgi.getUserName()) || hiveClient == null) { + return realLoginUgi; + } + + String tokenSignature = conf.getProperty(HIVE_METASTORE_TOKEN_SIGNATURE, ""); + String principal = conf.getProperty(PRINCIPAL_KEY, ""); + @SuppressWarnings("null") + List<String> principalComponents = Splitter.on('@').splitToList(principal); + Preconditions.checkArgument( + principalComponents.size() == 2, "The principal has the wrong format"); + String kerberosRealm = principalComponents.get(1); + + UserGroupInformation proxyUser; + final String finalPrincipalName; + if (!currentUser.contains("@")) { + finalPrincipalName = String.format("%s@%s", currentUser, kerberosRealm); + } else { + finalPrincipalName = currentUser; + } + + proxyUser = UserGroupInformation.createProxyUser(finalPrincipalName, realLoginUgi); + + // Acquire HMS delegation token for the proxy user and attach it to UGI + Preconditions.checkArgument(hiveClient != null, "HiveClient is not set in KerberosClient"); + String tokenStr = + hiveClient.getDelegationToken(finalPrincipalName, realLoginUgi.getUserName()); + + Token<DelegationTokenIdentifier> delegationToken = new Token<>(); + delegationToken.decodeFromUrlString(tokenStr); + delegationToken.setService(new Text(tokenSignature)); + proxyUser.addToken(delegationToken); + + return proxyUser; + } catch (Exception e) { + throw new RuntimeException("Failed to create proxy user for Kerberos Hive client", e); + } + } + + public UserGroupInformation login() throws Exception { + KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf); + + // Check the principal and keytab file + String catalogPrincipal = kerberosConfig.getPrincipalName(); + Preconditions.checkArgument( + StringUtils.isNotBlank(catalogPrincipal), "The principal can't be blank"); + List<String> principalComponents = Splitter.on('@').splitToList(catalogPrincipal); + Preconditions.checkArgument( + principalComponents.size() == 2, "The principal has the wrong format"); + + // Login + UserGroupInformation.setConfiguration(hadoopConf); + UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath); + UserGroupInformation loginUgi = UserGroupInformation.getLoginUser(); + realLoginUgi = loginUgi; + + // Refresh the cache if it's out of date. + if (refreshCredentials) { + if (checkTgtExecutor == null) { + checkTgtExecutor = new ScheduledThreadPoolExecutor(1, getThreadFactory("check-tgt")); + } Review Comment: The checkTgtExecutor is declared as volatile, but its initialization in the login() method is not properly synchronized. Lines 128-130 check if checkTgtExecutor is null and then create it, but this check-then-act pattern is not atomic even with volatile. In a multi-threaded environment, multiple threads could pass the null check simultaneously and create multiple executors. This should be protected with proper synchronization or use a thread-safe initialization pattern. ########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.hive.kerberos; + +import static org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_TOKEN_SIGNATURE; +import static org.apache.gravitino.hive.kerberos.KerberosConfig.PRINCIPAL_KEY; + +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.hive.client.HiveClient; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KerberosClient implements java.io.Closeable { + private static final Logger LOG = LoggerFactory.getLogger(KerberosClient.class); + + private volatile ScheduledThreadPoolExecutor checkTgtExecutor; + private final Properties conf; + private final Configuration hadoopConf; + private final boolean refreshCredentials; + private volatile UserGroupInformation realLoginUgi; + private final String keytabFilePath; + private volatile HiveClient hiveClient = null; + + public KerberosClient( + Properties properties, + Configuration hadoopConf, + boolean refreshCredentials, + String keytabFilePath) + throws IOException { + this.conf = properties; + this.hadoopConf = hadoopConf; + this.refreshCredentials = refreshCredentials; + File keyTabFile = saveKeyTabFileFromUri(keytabFilePath); + this.keytabFilePath = keyTabFile.getAbsolutePath(); + } + + public UserGroupInformation loginProxyUser(String currentUser) { + try { + if (currentUser.equals(realLoginUgi.getUserName()) || hiveClient == null) { + return realLoginUgi; + } + + String tokenSignature = conf.getProperty(HIVE_METASTORE_TOKEN_SIGNATURE, ""); + String principal = conf.getProperty(PRINCIPAL_KEY, ""); + @SuppressWarnings("null") + List<String> principalComponents = Splitter.on('@').splitToList(principal); + Preconditions.checkArgument( + principalComponents.size() == 2, "The principal has the wrong format"); + String kerberosRealm = principalComponents.get(1); + + UserGroupInformation proxyUser; + final String finalPrincipalName; + if (!currentUser.contains("@")) { + finalPrincipalName = String.format("%s@%s", currentUser, kerberosRealm); + } else { + finalPrincipalName = currentUser; + } + + proxyUser = UserGroupInformation.createProxyUser(finalPrincipalName, realLoginUgi); + + // Acquire HMS delegation token for the proxy user and attach it to UGI + Preconditions.checkArgument(hiveClient != null, "HiveClient is not set in KerberosClient"); Review Comment: The check on line 94 verifies that hiveClient is not null, but this check is redundant given the earlier check on line 71. If hiveClient is null at line 71, the method returns realLoginUgi immediately, so we can never reach line 94 with a null hiveClient. This Preconditions check should be removed or the logic should be refactored to avoid redundancy. ```suggestion ``` ########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/FetchFileUtils.java: ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.hive.kerberos; + +import java.io.File; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + Review Comment: The FetchFileUtils class is missing class-level Javadoc documentation. As a utility class handling sensitive file operations (keytab files), it should have documentation explaining its purpose, security considerations, and supported URI schemes. ```suggestion /** * Utility class for fetching files from various URI schemes to a local destination. * <p> * This class is primarily used to handle sensitive file operations, such as fetching * Kerberos keytab files, from different sources including local filesystems, HTTP(S), * FTP, and HDFS. It supports the following URI schemes: * <ul> * <li><b>file</b>: Creates a symbolic link to the local file.</li> * <li><b>http</b>, <b>https</b>, <b>ftp</b>: Downloads the file using the specified protocol.</li> * <li><b>hdfs</b>: Copies the file from Hadoop Distributed File System.</li> * </ul> * <p> * <b>Security considerations:</b> * <ul> * <li>Fetching sensitive files (e.g., keytabs) from remote or untrusted sources may expose credentials.</li> * <li>Symbolic link creation (for <b>file</b> scheme) may have security implications if the destination is not trusted.</li> * <li>Ensure proper permissions and validation of URIs to avoid path traversal or SSRF vulnerabilities.</li> * </ul> */ ``` ########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/FetchFileUtils.java: ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.hive.kerberos; + +import java.io.File; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public class FetchFileUtils { + + private FetchFileUtils() {} + + public static void fetchFileFromUri( + String fileUri, File destFile, int timeout, Configuration conf) throws java.io.IOException { + try { Review Comment: The fetchFileFromUri method lacks Javadoc documentation. Given that this method handles security-sensitive file operations and supports multiple URI schemes, it should have comprehensive documentation including parameter descriptions, return value, exceptions thrown, and security considerations for each URI scheme. ```suggestion /** * Fetches a file from the specified URI and stores it at the given destination file. * <p> * Supported URI schemes: * <ul> * <li><b>http, https, ftp</b>: Downloads the file from the remote URL to {@code destFile} using the specified timeout.</li> * <li><b>file</b>: Creates a symbolic link at {@code destFile} pointing to the local file specified by {@code fileUri}.</li> * <li><b>hdfs</b>: Copies the file from HDFS to {@code destFile} using the provided Hadoop {@code Configuration}.</li> * </ul> * <p> * <b>Security considerations:</b> * <ul> * <li>When using <b>http, https, or ftp</b>, ensure the source is trusted to avoid downloading malicious files.</li> * <li>When using <b>file</b>, creating a symbolic link may introduce security risks such as symlink attacks or privilege escalation. Validate the source path and ensure proper permissions.</li> * <li>When using <b>hdfs</b>, ensure the Hadoop configuration is secure and the source path is trusted.</li> * </ul> * * @param fileUri the URI of the source file. Supported schemes: http, https, ftp, file, hdfs. * @param destFile the destination {@link File} where the file will be stored or the symlink will be created. * @param timeout the timeout in seconds for network operations (applies to http, https, ftp). * @param conf the Hadoop {@link Configuration} used for HDFS operations; may be ignored for other schemes. * @throws java.io.IOException if an I/O error occurs during file operations. * @throws IllegalArgumentException if the URI scheme is unsupported or the URI is malformed. */ public static void fetchFileFromUri( String fileUri, File destFile, int timeout, Configuration conf) throws java.io.IOException { ``` ########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/FetchFileUtils.java: ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.hive.kerberos; + +import java.io.File; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public class FetchFileUtils { + + private FetchFileUtils() {} + + public static void fetchFileFromUri( + String fileUri, File destFile, int timeout, Configuration conf) throws java.io.IOException { + try { + URI uri = new URI(fileUri); + String scheme = java.util.Optional.ofNullable(uri.getScheme()).orElse("file"); + + switch (scheme) { + case "http": + case "https": + case "ftp": + FileUtils.copyURLToFile(uri.toURL(), destFile, timeout * 1000, timeout * 1000); + break; + + case "file": + Files.createSymbolicLink(destFile.toPath(), new File(uri.getPath()).toPath()); + break; + + case "hdfs": + FileSystem.get(conf).copyToLocalFile(new Path(uri), new Path(destFile.toURI())); + break; + Review Comment: This code contradicts the validation at line 154 in KerberosClient.saveKeyTabFileFromUri which explicitly prevents HDFS URIs with the message "Keytab uri doesn't support to use HDFS". Either the validation should be removed or this HDFS case should be removed from the switch statement to maintain consistency. ```suggestion ``` ########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.hive.kerberos; + +import static org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_TOKEN_SIGNATURE; +import static org.apache.gravitino.hive.kerberos.KerberosConfig.PRINCIPAL_KEY; + +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.hive.client.HiveClient; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KerberosClient implements java.io.Closeable { + private static final Logger LOG = LoggerFactory.getLogger(KerberosClient.class); + + private volatile ScheduledThreadPoolExecutor checkTgtExecutor; + private final Properties conf; + private final Configuration hadoopConf; + private final boolean refreshCredentials; + private volatile UserGroupInformation realLoginUgi; + private final String keytabFilePath; + private volatile HiveClient hiveClient = null; + + public KerberosClient( + Properties properties, + Configuration hadoopConf, + boolean refreshCredentials, + String keytabFilePath) + throws IOException { + this.conf = properties; + this.hadoopConf = hadoopConf; + this.refreshCredentials = refreshCredentials; + File keyTabFile = saveKeyTabFileFromUri(keytabFilePath); + this.keytabFilePath = keyTabFile.getAbsolutePath(); + } + + public UserGroupInformation loginProxyUser(String currentUser) { + try { + if (currentUser.equals(realLoginUgi.getUserName()) || hiveClient == null) { + return realLoginUgi; + } + + String tokenSignature = conf.getProperty(HIVE_METASTORE_TOKEN_SIGNATURE, ""); + String principal = conf.getProperty(PRINCIPAL_KEY, ""); + @SuppressWarnings("null") + List<String> principalComponents = Splitter.on('@').splitToList(principal); + Preconditions.checkArgument( + principalComponents.size() == 2, "The principal has the wrong format"); + String kerberosRealm = principalComponents.get(1); + + UserGroupInformation proxyUser; + final String finalPrincipalName; + if (!currentUser.contains("@")) { + finalPrincipalName = String.format("%s@%s", currentUser, kerberosRealm); + } else { + finalPrincipalName = currentUser; + } + + proxyUser = UserGroupInformation.createProxyUser(finalPrincipalName, realLoginUgi); + + // Acquire HMS delegation token for the proxy user and attach it to UGI + Preconditions.checkArgument(hiveClient != null, "HiveClient is not set in KerberosClient"); + String tokenStr = + hiveClient.getDelegationToken(finalPrincipalName, realLoginUgi.getUserName()); + + Token<DelegationTokenIdentifier> delegationToken = new Token<>(); + delegationToken.decodeFromUrlString(tokenStr); + delegationToken.setService(new Text(tokenSignature)); + proxyUser.addToken(delegationToken); + + return proxyUser; + } catch (Exception e) { + throw new RuntimeException("Failed to create proxy user for Kerberos Hive client", e); + } + } + + public UserGroupInformation login() throws Exception { + KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf); + + // Check the principal and keytab file + String catalogPrincipal = kerberosConfig.getPrincipalName(); + Preconditions.checkArgument( + StringUtils.isNotBlank(catalogPrincipal), "The principal can't be blank"); + List<String> principalComponents = Splitter.on('@').splitToList(catalogPrincipal); + Preconditions.checkArgument( + principalComponents.size() == 2, "The principal has the wrong format"); + + // Login + UserGroupInformation.setConfiguration(hadoopConf); + UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath); + UserGroupInformation loginUgi = UserGroupInformation.getLoginUser(); + realLoginUgi = loginUgi; + + // Refresh the cache if it's out of date. + if (refreshCredentials) { + if (checkTgtExecutor == null) { + checkTgtExecutor = new ScheduledThreadPoolExecutor(1, getThreadFactory("check-tgt")); + } + int checkInterval = kerberosConfig.getCheckIntervalSec(); + checkTgtExecutor.scheduleAtFixedRate( + () -> { + try { + loginUgi.checkTGTAndReloginFromKeytab(); + } catch (Exception e) { + LOG.error("Fail to refresh ugi token: ", e); + } + }, + checkInterval, + checkInterval, + TimeUnit.SECONDS); + } + + return loginUgi; + } + + public File saveKeyTabFileFromUri(String path) throws IOException { + KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf); + + String keyTabUri = kerberosConfig.getKeytab(); + Preconditions.checkArgument(StringUtils.isNotBlank(keyTabUri), "Keytab uri can't be blank"); + Preconditions.checkArgument( + !keyTabUri.trim().startsWith("hdfs"), "Keytab uri doesn't support to use HDFS"); Review Comment: The error message uses a double negative which makes it unclear. Instead of "doesn't support to use HDFS", use "HDFS URIs are not supported for keytab files" or "does not support HDFS URIs". ```suggestion !keyTabUri.trim().startsWith("hdfs"), "HDFS URIs are not supported for keytab files"); ``` ########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientFactory.java: ########## @@ -169,7 +185,22 @@ private HiveClient createHiveClientInternal(HiveClientClassLoader classloader) { ClassLoader origLoader = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(classloader); try { - return createHiveClientImpl(classloader.getHiveVersion(), properties, classloader); + if (!enableImpersonation) { + return createHiveClientImpl(classloader.getHiveVersion(), properties, classloader); + } else { + UserGroupInformation ugi; + if (!enableKerberos) { + ugi = UserGroupInformation.getCurrentUser(); + if (!ugi.getUserName().equals(PrincipalUtils.getCurrentUserName())) { + ugi = UserGroupInformation.createProxyUser(PrincipalUtils.getCurrentUserName(), ugi); + } Review Comment: The logic for handling impersonation when Kerberos is disabled lacks proper validation. If a user enables impersonation without Kerberos, the code creates a proxy user based on PrincipalUtils.getCurrentUserName(), but there's no validation that this is a legitimate use case. This could potentially be exploited for privilege escalation if impersonation is enabled in a non-Kerberos environment without proper authentication controls elsewhere. ########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.hive.kerberos; + +import static org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_TOKEN_SIGNATURE; +import static org.apache.gravitino.hive.kerberos.KerberosConfig.PRINCIPAL_KEY; + +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.hive.client.HiveClient; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KerberosClient implements java.io.Closeable { + private static final Logger LOG = LoggerFactory.getLogger(KerberosClient.class); + + private volatile ScheduledThreadPoolExecutor checkTgtExecutor; + private final Properties conf; + private final Configuration hadoopConf; + private final boolean refreshCredentials; + private volatile UserGroupInformation realLoginUgi; + private final String keytabFilePath; + private volatile HiveClient hiveClient = null; + + public KerberosClient( + Properties properties, + Configuration hadoopConf, + boolean refreshCredentials, + String keytabFilePath) + throws IOException { + this.conf = properties; + this.hadoopConf = hadoopConf; + this.refreshCredentials = refreshCredentials; + File keyTabFile = saveKeyTabFileFromUri(keytabFilePath); + this.keytabFilePath = keyTabFile.getAbsolutePath(); + } + + public UserGroupInformation loginProxyUser(String currentUser) { + try { + if (currentUser.equals(realLoginUgi.getUserName()) || hiveClient == null) { + return realLoginUgi; + } + + String tokenSignature = conf.getProperty(HIVE_METASTORE_TOKEN_SIGNATURE, ""); + String principal = conf.getProperty(PRINCIPAL_KEY, ""); + @SuppressWarnings("null") + List<String> principalComponents = Splitter.on('@').splitToList(principal); + Preconditions.checkArgument( + principalComponents.size() == 2, "The principal has the wrong format"); + String kerberosRealm = principalComponents.get(1); + + UserGroupInformation proxyUser; + final String finalPrincipalName; + if (!currentUser.contains("@")) { + finalPrincipalName = String.format("%s@%s", currentUser, kerberosRealm); + } else { + finalPrincipalName = currentUser; + } + + proxyUser = UserGroupInformation.createProxyUser(finalPrincipalName, realLoginUgi); + + // Acquire HMS delegation token for the proxy user and attach it to UGI + Preconditions.checkArgument(hiveClient != null, "HiveClient is not set in KerberosClient"); + String tokenStr = + hiveClient.getDelegationToken(finalPrincipalName, realLoginUgi.getUserName()); + + Token<DelegationTokenIdentifier> delegationToken = new Token<>(); + delegationToken.decodeFromUrlString(tokenStr); + delegationToken.setService(new Text(tokenSignature)); + proxyUser.addToken(delegationToken); + + return proxyUser; + } catch (Exception e) { + throw new RuntimeException("Failed to create proxy user for Kerberos Hive client", e); + } + } + + public UserGroupInformation login() throws Exception { + KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf); + + // Check the principal and keytab file + String catalogPrincipal = kerberosConfig.getPrincipalName(); + Preconditions.checkArgument( + StringUtils.isNotBlank(catalogPrincipal), "The principal can't be blank"); + List<String> principalComponents = Splitter.on('@').splitToList(catalogPrincipal); + Preconditions.checkArgument( + principalComponents.size() == 2, "The principal has the wrong format"); + + // Login + UserGroupInformation.setConfiguration(hadoopConf); + UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath); + UserGroupInformation loginUgi = UserGroupInformation.getLoginUser(); + realLoginUgi = loginUgi; + + // Refresh the cache if it's out of date. + if (refreshCredentials) { + if (checkTgtExecutor == null) { + checkTgtExecutor = new ScheduledThreadPoolExecutor(1, getThreadFactory("check-tgt")); + } + int checkInterval = kerberosConfig.getCheckIntervalSec(); + checkTgtExecutor.scheduleAtFixedRate( + () -> { + try { + loginUgi.checkTGTAndReloginFromKeytab(); + } catch (Exception e) { + LOG.error("Fail to refresh ugi token: ", e); + } + }, + checkInterval, + checkInterval, + TimeUnit.SECONDS); + } + + return loginUgi; + } + + public File saveKeyTabFileFromUri(String path) throws IOException { + KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf); + + String keyTabUri = kerberosConfig.getKeytab(); + Preconditions.checkArgument(StringUtils.isNotBlank(keyTabUri), "Keytab uri can't be blank"); + Preconditions.checkArgument( + !keyTabUri.trim().startsWith("hdfs"), "Keytab uri doesn't support to use HDFS"); + + File keytabsDir = new File("keytabs"); + if (!keytabsDir.exists()) { + keytabsDir.mkdir(); + } + File keytabFile = new File(path); + keytabFile.deleteOnExit(); + if (keytabFile.exists() && !keytabFile.delete()) { + throw new IllegalStateException( + String.format("Fail to delete keytab file %s", keytabFile.getAbsolutePath())); + } + int fetchKeytabFileTimeout = kerberosConfig.getFetchTimeoutSec(); + FetchFileUtils.fetchFileFromUri(keyTabUri, keytabFile, fetchKeytabFileTimeout, hadoopConf); + return keytabFile; + } + + private static ThreadFactory getThreadFactory(String factoryName) { + return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(factoryName + "-%d").build(); + } + + @Override + public void close() { + if (checkTgtExecutor != null) { + checkTgtExecutor.shutdown(); + } + } + + public void setHiveClient(HiveClient client) { + this.hiveClient = client; Review Comment: The hiveClient field is declared as volatile and accessed on line 71 without synchronization, but is set via setHiveClient() on line 183. This creates a race condition where loginProxyUser() might see a null hiveClient even after it was set, or see a partially constructed object. The volatile keyword ensures visibility but the null check followed by usage is not atomic. Consider proper synchronization or document that setHiveClient must be called before any calls to loginProxyUser. ########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/AuthenticationConfig.java: ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.hive.kerberos; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; + +import java.util.Map; +import java.util.Properties; +import org.apache.gravitino.Config; +import org.apache.gravitino.config.ConfigBuilder; +import org.apache.gravitino.config.ConfigConstants; +import org.apache.gravitino.config.ConfigEntry; +import org.apache.hadoop.conf.Configuration; + +public class AuthenticationConfig extends Config { + + // The key for the authentication type, currently we support Kerberos and simple Review Comment: The AuthenticationConfig class is missing class-level Javadoc documentation. As a public configuration class that defines authentication settings and controls Kerberos vs simple authentication, it should have documentation explaining its purpose, the authentication types supported, and how it integrates with both catalog properties and Hadoop configuration. ```suggestion /** * Configuration class for authentication settings in the Hive Metastore catalog. * <p> * This class manages authentication configuration, supporting both Kerberos and simple authentication types. * It loads settings from both catalog properties and Hadoop configuration, allowing flexible integration. * <ul> * <li>Supported authentication types: {@code simple}, {@code kerberos}</li> * <li>Configuration sources: catalog properties and Hadoop {@code Configuration}</li> * <li>Controls impersonation and authentication type for secure access</li> * </ul> * <p> * The authentication type is determined by the {@code authentication.type} property, which defaults to {@code simple} * if not specified. When Kerberos is enabled, additional settings such as impersonation can be configured. */ public class AuthenticationConfig extends Config { ``` ########## catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/FetchFileUtils.java: ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.hive.kerberos; + +import java.io.File; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public class FetchFileUtils { + + private FetchFileUtils() {} + + public static void fetchFileFromUri( + String fileUri, File destFile, int timeout, Configuration conf) throws java.io.IOException { + try { + URI uri = new URI(fileUri); + String scheme = java.util.Optional.ofNullable(uri.getScheme()).orElse("file"); + + switch (scheme) { + case "http": + case "https": + case "ftp": + FileUtils.copyURLToFile(uri.toURL(), destFile, timeout * 1000, timeout * 1000); + break; + + case "file": + Files.createSymbolicLink(destFile.toPath(), new File(uri.getPath()).toPath()); Review Comment: Creating a symbolic link for local file URIs could cause issues. If the source file is deleted or modified, the symlink will point to invalid or changed content. For security and reliability, it would be safer to copy the file content rather than creating a symbolic link. Additionally, symbolic link creation may fail due to filesystem permissions or lack of support on certain platforms (like Windows without administrator privileges). ```suggestion FileUtils.copyFile(new File(uri.getPath()), destFile); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
