This is an automated email from the ASF dual-hosted git repository. yuqi4733 pushed a commit to branch internal-main in repository https://gitbox.apache.org/repos/asf/gravitino.git
commit 9b2a26be5f0d1da07fa16409bf670bd29d73a7c9 Author: Yuhui <[email protected]> AuthorDate: Thu Dec 11 16:04:16 2025 +0800 [#9456] feat(hive-catalog): Support Kerberos authentication and user impersonation in the HiveClient (#9458) ### What changes were proposed in this pull request? Support Kerberos authentication and user impersonation in the HiveClient ### Why are the changes needed? Fix: #9456 ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? UT and IT --- .../gravitino/hive/client/HiveClientFactory.java | 72 +++++++- .../gravitino/hive/client/ProxyHiveClientImpl.java | 63 ++++++- .../hive/kerberos/AuthenticationConfig.java | 83 +++++++++ .../gravitino/hive/kerberos/FetchFileUtils.java | 63 +++++++ .../gravitino/hive/kerberos/KerberosClient.java | 197 +++++++++++++++++++++ .../gravitino/hive/kerberos/KerberosConfig.java | 93 ++++++++++ 6 files changed, 560 insertions(+), 11 deletions(-) diff --git a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientFactory.java b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientFactory.java index 4442125d9e..39ceefd240 100644 --- a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientFactory.java +++ b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientFactory.java @@ -30,6 +30,9 @@ import java.lang.reflect.Method; import java.util.Properties; import org.apache.commons.lang3.reflect.MethodUtils; import org.apache.gravitino.exceptions.GravitinoRuntimeException; +import org.apache.gravitino.hive.kerberos.AuthenticationConfig; +import org.apache.gravitino.hive.kerberos.KerberosClient; +import org.apache.gravitino.utils.PrincipalUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; @@ -38,17 +41,23 @@ import org.slf4j.LoggerFactory; public final class HiveClientFactory { private static final Logger LOG = LoggerFactory.getLogger(HiveClientFactory.class); + public static final String GRAVITINO_KEYTAB_FORMAT = "keytabs/gravitino-%s-keytab"; + // Remember which Hive backend classloader worked successfully for this factory. private volatile HiveClientClassLoader backendClassLoader; private final Object classLoaderLock = new Object(); - @SuppressWarnings("UnusedVariable") - private final Configuration hadoopConf; + private boolean enableKerberos; + private boolean enableImpersonation = false; + private KerberosClient kerberosClient; + private final Configuration hadoopConf; private final Properties properties; + private final String keytabPath; /** - * Creates a {@link HiveClientFactory} bound to the given configuration properties. + * Creates a {@link HiveClientFactory} boundGRAVITINO_KEYTAB_FORMAT to the given configuration + * properties. * * @param properties Hive client configuration, must not be null. * @param id An identifier for this factory instance. @@ -56,10 +65,18 @@ public final class HiveClientFactory { public HiveClientFactory(Properties properties, String id) { Preconditions.checkArgument(properties != null, "Properties cannot be null"); this.properties = properties; + this.keytabPath = String.format(GRAVITINO_KEYTAB_FORMAT, id); try { this.hadoopConf = new Configuration(); updateConfigurationFromProperties(properties, hadoopConf); + + initKerberosIfNecessary(); + if (enableKerberos) { + // set hive client to kerberos client for retrieving delegation token + HiveClient client = createHiveClient(); + kerberosClient.setHiveClient(client); + } } catch (Exception e) { throw new RuntimeException("Failed to initialize HiveClientFactory", e); } @@ -169,7 +186,23 @@ public final class HiveClientFactory { ClassLoader origLoader = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(classloader); try { - return createHiveClientImpl(classloader.getHiveVersion(), properties, classloader); + if (enableImpersonation) { + UserGroupInformation ugi; + if (enableKerberos) { + ugi = kerberosClient.loginProxyUser(PrincipalUtils.getCurrentUserName()); + } else { + ugi = UserGroupInformation.getCurrentUser(); + if (!ugi.getUserName().equals(PrincipalUtils.getCurrentUserName())) { + ugi = UserGroupInformation.createProxyUser(PrincipalUtils.getCurrentUserName(), ugi); + } + } + return createProxyHiveClientImpl( + classloader.getHiveVersion(), properties, ugi, classloader); + + } else { + return createHiveClientImpl(classloader.getHiveVersion(), properties, classloader); + } + } catch (Exception e) { throw HiveExceptionConverter.toGravitinoException( e, @@ -180,18 +213,39 @@ public final class HiveClientFactory { } } + private void initKerberosIfNecessary() { + try { + AuthenticationConfig authenticationConfig = new AuthenticationConfig(properties, hadoopConf); + enableKerberos = authenticationConfig.isKerberosAuth(); + enableImpersonation = authenticationConfig.isImpersonationEnable(); + if (!enableKerberos) { + return; + } + + kerberosClient = new KerberosClient(properties, hadoopConf, true, keytabPath); + kerberosClient.login(); + + } catch (Exception e) { + throw new RuntimeException("Failed to initialize kerberos client", e); + } + } + /** Release resources held by this factory. */ public void close() { - synchronized (classLoaderLock) { - try { + try { + if (kerberosClient != null) { + kerberosClient.close(); + kerberosClient = null; + } + + synchronized (classLoaderLock) { if (backendClassLoader != null) { backendClassLoader.close(); backendClassLoader = null; } - - } catch (Exception e) { - LOG.warn("Failed to close HiveClientFactory", e); } + } catch (Exception e) { + LOG.warn("Failed to close HiveClientFactory", e); } } } diff --git a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/ProxyHiveClientImpl.java b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/ProxyHiveClientImpl.java index c62c394a04..6363148cdf 100644 --- a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/ProxyHiveClientImpl.java +++ b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/ProxyHiveClientImpl.java @@ -18,12 +18,71 @@ */ package org.apache.gravitino.hive.client; +import java.io.IOException; import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.lang.reflect.UndeclaredThrowableException; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; +import java.util.Properties; +import org.apache.hadoop.security.UserGroupInformation; +/** + * A {@link HiveClient} proxy that executes all methods as a given user via {@link + * UserGroupInformation#doAs(PrivilegedExceptionAction)}. + */ public class ProxyHiveClientImpl implements InvocationHandler { + + private final HiveClient delegate; + private final UserGroupInformation ugi; + + private ProxyHiveClientImpl(HiveClient delegate, UserGroupInformation ugi) { + this.delegate = delegate; + this.ugi = ugi; + } + + /** + * Wraps a {@link HiveClient} so that all its methods are executed via {@link + * UserGroupInformation#doAs(PrivilegedExceptionAction)} of the current user. + * + * <p>Callers should ensure Kerberos has been configured and the login user is set appropriately + * (for example via keytab) before calling this method. + */ + public static HiveClient createClient( + HiveClientClassLoader.HiveVersion version, UserGroupInformation ugi, Properties properties) { + try { + HiveClient client = + ugi.doAs( + (PrivilegedExceptionAction<HiveClient>) + () -> + HiveClientFactory.createHiveClientImpl( + version, properties, Thread.currentThread().getContextClassLoader())); + return (HiveClient) + Proxy.newProxyInstance( + HiveClient.class.getClassLoader(), + new Class<?>[] {HiveClient.class}, + new ProxyHiveClientImpl(client, ugi)); + + } catch (IOException | InterruptedException ex) { + throw new RuntimeException("Failed to create Kerberos Hive client", ex); + } + } + @Override - public Object invoke(Object o, Method method, Object[] objects) throws Throwable { - return null; + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + try { + return ugi.doAs((PrivilegedExceptionAction<Object>) () -> method.invoke(delegate, args)); + } catch (UndeclaredThrowableException e) { + Throwable innerException = e.getCause(); + if (innerException instanceof PrivilegedActionException) { + throw innerException.getCause(); + } else if (innerException instanceof InvocationTargetException) { + throw innerException.getCause(); + } else { + throw innerException; + } + } } } diff --git a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/AuthenticationConfig.java b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/AuthenticationConfig.java new file mode 100644 index 0000000000..197bcc2dc3 --- /dev/null +++ b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/AuthenticationConfig.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.hive.kerberos; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; + +import java.util.Map; +import java.util.Properties; +import org.apache.gravitino.Config; +import org.apache.gravitino.config.ConfigBuilder; +import org.apache.gravitino.config.ConfigConstants; +import org.apache.gravitino.config.ConfigEntry; +import org.apache.hadoop.conf.Configuration; + +/** Configuration for authentication */ +public class AuthenticationConfig extends Config { + + // The key for the authentication type, currently we support Kerberos and simple + public static final String AUTH_TYPE_KEY = "authentication.type"; + + public static final String IMPERSONATION_ENABLE_KEY = "authentication.impersonation-enable"; + + enum AuthenticationType { + SIMPLE, + KERBEROS; + } + + public static final boolean KERBEROS_DEFAULT_IMPERSONATION_ENABLE = false; + + public AuthenticationConfig(Properties properties, Configuration configuration) { + super(false); + loadFromHdfsConfiguration(configuration); + loadFromMap((Map) properties, k -> true); + } + + private void loadFromHdfsConfiguration(Configuration configuration) { + String authType = configuration.get(HADOOP_SECURITY_AUTHENTICATION, "simple"); + configMap.put(AUTH_TYPE_KEY, authType); + } + + public static final ConfigEntry<String> AUTH_TYPE_ENTRY = + new ConfigBuilder(AUTH_TYPE_KEY) + .doc("The type of authentication, currently we only support simple and Kerberos") + .version(ConfigConstants.VERSION_1_0_0) + .stringConf() + .createWithDefault("simple"); + + public static final ConfigEntry<Boolean> ENABLE_IMPERSONATION_ENTRY = + new ConfigBuilder(IMPERSONATION_ENABLE_KEY) + .doc("Whether to enable impersonation") + .version(ConfigConstants.VERSION_1_0_0) + .booleanConf() + .createWithDefault(KERBEROS_DEFAULT_IMPERSONATION_ENABLE); + + public String getAuthType() { + return get(AUTH_TYPE_ENTRY); + } + + public boolean isKerberosAuth() { + return AuthenticationConfig.AuthenticationType.KERBEROS.name().equalsIgnoreCase(getAuthType()); + } + + public boolean isImpersonationEnable() { + return get(ENABLE_IMPERSONATION_ENTRY); + } +} diff --git a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/FetchFileUtils.java b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/FetchFileUtils.java new file mode 100644 index 0000000000..743b1d562c --- /dev/null +++ b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/FetchFileUtils.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.hive.kerberos; + +import java.io.File; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public class FetchFileUtils { + + private FetchFileUtils() {} + + public static void fetchFileFromUri( + String fileUri, File destFile, int timeout, Configuration conf) throws java.io.IOException { + try { + URI uri = new URI(fileUri); + String scheme = java.util.Optional.ofNullable(uri.getScheme()).orElse("file"); + + switch (scheme) { + case "http": + case "https": + case "ftp": + FileUtils.copyURLToFile(uri.toURL(), destFile, timeout * 1000, timeout * 1000); + break; + + case "file": + Files.createSymbolicLink(destFile.toPath(), new File(uri.getPath()).toPath()); + break; + + case "hdfs": + FileSystem.get(conf).copyToLocalFile(new Path(uri), new Path(destFile.toURI())); + break; + + default: + throw new IllegalArgumentException( + String.format("The scheme '%s' is not supported", scheme)); + } + } catch (URISyntaxException ue) { + throw new IllegalArgumentException("The uri of file has the wrong format", ue); + } + } +} diff --git a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java new file mode 100644 index 0000000000..a5cfa7eddf --- /dev/null +++ b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosClient.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.hive.kerberos; + +import static org.apache.gravitino.catalog.hive.HiveConstants.HIVE_METASTORE_TOKEN_SIGNATURE; +import static org.apache.gravitino.hive.kerberos.KerberosConfig.PRINCIPAL_KEY; + +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.hive.client.HiveClient; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Kerberos client for Hive Metastore. */ +public class KerberosClient implements java.io.Closeable { + private static final Logger LOG = LoggerFactory.getLogger(KerberosClient.class); + + private ScheduledThreadPoolExecutor checkTgtExecutor; + private final Properties conf; + private final Configuration hadoopConf; + private final boolean refreshCredentials; + private UserGroupInformation realLoginUgi; + private final String keytabFilePath; + private HiveClient hiveClient = null; + + public KerberosClient( + Properties properties, + Configuration hadoopConf, + boolean refreshCredentials, + String keytabFilePath) + throws IOException { + this.conf = properties; + this.hadoopConf = hadoopConf; + this.refreshCredentials = refreshCredentials; + File keyTabFile = saveKeyTabFileFromUri(keytabFilePath); + this.keytabFilePath = keyTabFile.getAbsolutePath(); + } + + /** + * Login proxy user for the given user name. + * + * @param currentUser The user name to login + * @return The UserGroupInformation for the proxy user + */ + public UserGroupInformation loginProxyUser(String currentUser) { + try { + // hiveClient is null in case the initial the kerbers client with the login user + if (currentUser.equals(realLoginUgi.getUserName()) || hiveClient == null) { + return realLoginUgi; + } + + String tokenSignature = conf.getProperty(HIVE_METASTORE_TOKEN_SIGNATURE, ""); + String principal = conf.getProperty(PRINCIPAL_KEY, ""); + List<String> principalComponents = Splitter.on('@').splitToList(principal); + Preconditions.checkArgument( + principalComponents.size() == 2, "The principal has the wrong format"); + String kerberosRealm = principalComponents.get(1); + + UserGroupInformation proxyUser; + final String finalPrincipalName; + if (!currentUser.contains("@")) { + finalPrincipalName = String.format("%s@%s", currentUser, kerberosRealm); + } else { + finalPrincipalName = currentUser; + } + + proxyUser = UserGroupInformation.createProxyUser(finalPrincipalName, realLoginUgi); + + // Acquire HMS delegation token for the proxy user and attach it to UGI + String tokenStr = + hiveClient.getDelegationToken(finalPrincipalName, realLoginUgi.getUserName()); + + Token<DelegationTokenIdentifier> delegationToken = new Token<>(); + delegationToken.decodeFromUrlString(tokenStr); + delegationToken.setService(new Text(tokenSignature)); + proxyUser.addToken(delegationToken); + + return proxyUser; + } catch (Exception e) { + throw new RuntimeException("Failed to create proxy user for Kerberos Hive client", e); + } + } + + /** + * Login the Kerberos user from the principal and keytab file. + * + * @return + * @throws Exception + */ + public UserGroupInformation login() throws Exception { + KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf); + + // Check the principal and keytab file + String catalogPrincipal = kerberosConfig.getPrincipalName(); + Preconditions.checkArgument( + StringUtils.isNotBlank(catalogPrincipal), "The principal can't be blank"); + List<String> principalComponents = Splitter.on('@').splitToList(catalogPrincipal); + Preconditions.checkArgument( + principalComponents.size() == 2, "The principal has the wrong format"); + + // Login + UserGroupInformation.setConfiguration(hadoopConf); + UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath); + UserGroupInformation loginUgi = UserGroupInformation.getLoginUser(); + realLoginUgi = loginUgi; + + // Refresh the cache if it's out of date. + if (refreshCredentials) { + if (checkTgtExecutor == null) { + checkTgtExecutor = new ScheduledThreadPoolExecutor(1, getThreadFactory("check-tgt")); + } + int checkInterval = kerberosConfig.getCheckIntervalSec(); + checkTgtExecutor.scheduleAtFixedRate( + () -> { + try { + loginUgi.checkTGTAndReloginFromKeytab(); + } catch (Exception e) { + LOG.error("Fail to refresh ugi token: ", e); + } + }, + checkInterval, + checkInterval, + TimeUnit.SECONDS); + } + + return loginUgi; + } + + public File saveKeyTabFileFromUri(String path) throws IOException { + KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf); + + String keyTabUri = kerberosConfig.getKeytab(); + Preconditions.checkArgument(StringUtils.isNotBlank(keyTabUri), "Keytab uri can't be blank"); + Preconditions.checkArgument( + !keyTabUri.trim().startsWith("hdfs"), "HDFS URIs are not supported for keytab files"); + + File keytabsDir = new File("keytabs"); + if (!keytabsDir.exists()) { + keytabsDir.mkdir(); + } + File keytabFile = new File(path); + keytabFile.deleteOnExit(); + if (keytabFile.exists() && !keytabFile.delete()) { + throw new IllegalStateException( + String.format("Fail to delete keytab file %s", keytabFile.getAbsolutePath())); + } + int fetchKeytabFileTimeout = kerberosConfig.getFetchTimeoutSec(); + FetchFileUtils.fetchFileFromUri(keyTabUri, keytabFile, fetchKeytabFileTimeout, hadoopConf); + return keytabFile; + } + + private static ThreadFactory getThreadFactory(String factoryName) { + return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(factoryName + "-%d").build(); + } + + @Override + public void close() { + if (checkTgtExecutor != null) { + checkTgtExecutor.shutdown(); + } + } + + public void setHiveClient(HiveClient client) { + this.hiveClient = client; + } +} diff --git a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosConfig.java b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosConfig.java new file mode 100644 index 0000000000..4495e0d7fc --- /dev/null +++ b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/kerberos/KerberosConfig.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.hive.kerberos; + +import java.util.Map; +import java.util.Properties; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.config.ConfigBuilder; +import org.apache.gravitino.config.ConfigConstants; +import org.apache.gravitino.config.ConfigEntry; +import org.apache.hadoop.conf.Configuration; + +/** Kerberos authentication configuration */ +public class KerberosConfig extends AuthenticationConfig { + public static final String KEY_TAB_URI_KEY = "authentication.kerberos.keytab-uri"; + + public static final String PRINCIPAL_KEY = "authentication.kerberos.principal"; + + public static final String CHECK_INTERVAL_SEC_KEY = "authentication.kerberos.check-interval-sec"; + + public static final String FETCH_TIMEOUT_SEC_KEY = + "authentication.kerberos.keytab-fetch-timeout-sec"; + + public static final ConfigEntry<String> PRINCIPAL_ENTRY = + new ConfigBuilder(PRINCIPAL_KEY) + .doc("The principal of the Kerberos connection") + .version(ConfigConstants.VERSION_1_0_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + + public static final ConfigEntry<String> KEYTAB_ENTRY = + new ConfigBuilder(KEY_TAB_URI_KEY) + .doc("The keytab of the Kerberos connection") + .version(ConfigConstants.VERSION_1_0_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + + public static final ConfigEntry<Integer> CHECK_INTERVAL_SEC_ENTRY = + new ConfigBuilder(CHECK_INTERVAL_SEC_KEY) + .doc("The check interval of the Kerberos connection") + .version(ConfigConstants.VERSION_1_0_0) + .intConf() + .checkValue(value -> value > 0, ConfigConstants.POSITIVE_NUMBER_ERROR_MSG) + .createWithDefault(60); + + public static final ConfigEntry<Integer> FETCH_TIMEOUT_SEC_ENTRY = + new ConfigBuilder(FETCH_TIMEOUT_SEC_KEY) + .doc("The fetch timeout of the Kerberos connection") + .version(ConfigConstants.VERSION_1_0_0) + .intConf() + .checkValue(value -> value > 0, ConfigConstants.POSITIVE_NUMBER_ERROR_MSG) + .createWithDefault(60); + + public KerberosConfig(Properties properties, Configuration configuration) { + super(properties, configuration); + loadFromMap((Map) properties, k -> true); + } + + public String getPrincipalName() { + return get(PRINCIPAL_ENTRY); + } + + public String getKeytab() { + return get(KEYTAB_ENTRY); + } + + public int getCheckIntervalSec() { + return get(CHECK_INTERVAL_SEC_ENTRY); + } + + public int getFetchTimeoutSec() { + return get(FETCH_TIMEOUT_SEC_ENTRY); + } +}
