jiaoqingbo commented on code in PR #22694: URL: https://github.com/apache/flink/pull/22694#discussion_r1230522057
########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveDelegationTokenProvider.java: ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.security.token; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.security.token.DelegationTokenProvider; +import org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenConverter; +import org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider; +import org.apache.flink.runtime.util.HadoopUtils; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.Optional; + +/** Delegation token provider for Hive. */ +@Internal +public class HiveDelegationTokenProvider implements DelegationTokenProvider { + + private static final Logger LOG = LoggerFactory.getLogger(HiveDelegationTokenProvider.class); + + org.apache.hadoop.conf.Configuration hiveConf; + + private KerberosLoginProvider kerberosLoginProvider; + + private static final Text TOKEN_ALIAS = new Text("hive.server2.delegation.token"); + + @Override + public String serviceName() { + return "HiveServer2"; + } + + @Override + public void init(Configuration configuration) throws Exception { + hiveConf = getHiveConfiguration(configuration); + kerberosLoginProvider = new KerberosLoginProvider(configuration); + } + + private org.apache.hadoop.conf.Configuration getHiveConfiguration(Configuration conf) { + try { + org.apache.hadoop.conf.Configuration hadoopConf = + HadoopUtils.getHadoopConfiguration(conf); + hiveConf = new HiveConf(hadoopConf, HiveConf.class); + } catch (Exception | NoClassDefFoundError e) { + LOG.warn("Fail to create Hive Configuration", e); + } + return hiveConf; + } + + @Override + public boolean delegationTokensRequired() throws Exception { + /** + * The general rule how a provider/receiver must behave is the following: The provider and + * the receiver must be added to the classpath together with all the additionally required + * dependencies. + * + * <p>This null check is required because the Hive provider is always on classpath but Hive + * jars are optional. Such case configuration is not able to be loaded. This construct is + * intended to be removed when Hive provider/receiver pair can be externalized (namely if a + * provider/receiver throws an exception then workload must be stopped). + */ + if (hiveConf == null) { + LOG.debug( + "Hive is not available (not packaged with this application), hence no " + + "tokens will be acquired."); + return false; + } + try { + if (!HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser())) { + return false; + } + } catch (IOException e) { + LOG.debug("Hadoop Kerberos is not enabled."); + return false; + } + return !hiveConf.getTrimmed("hive.metastore.uris", "").isEmpty() + && kerberosLoginProvider.isLoginPossible(false); + } + + @Override + public ObtainedDelegationTokens obtainDelegationTokens() throws Exception { + UserGroupInformation freshUGI = kerberosLoginProvider.doLoginAndReturnUGI(); + return freshUGI.doAs( + (PrivilegedExceptionAction<ObtainedDelegationTokens>) + () -> { + Preconditions.checkNotNull(hiveConf); + Hive hive = Hive.get((HiveConf) hiveConf); + try { + LOG.info("Obtaining Kerberos security token for Hive"); + + String principal = + hiveConf.getTrimmed( + "hive.metastore.kerberos.principal", ""); + + String tokenStr = + hive.getDelegationToken( + UserGroupInformation.getCurrentUser().getUserName(), + principal); + Token<DelegationTokenIdentifier> hive2Token = new Token<>(); + hive2Token.decodeFromUrlString(tokenStr); + + Credentials credentials = new Credentials(); + credentials.addToken(TOKEN_ALIAS, hive2Token); + + return new ObtainedDelegationTokens( + HadoopDelegationTokenConverter.serialize(credentials), + Optional.empty()); Review Comment: I tried it with the latest code, and I can get the expiration time and calculate the Renewal interval correctly. As shown in the log below ``` 2023-06-15 14:19:24,690 INFO org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - Obtaining Kerberos security token for HiveServer2 2023-06-15 14:19:24,694 DEBUG org.apache.thrift.transport.TSaslTransport [] - writing data length: 100 2023-06-15 14:19:24,760 DEBUG org.apache.thrift.transport.TSaslTransport [] - CLIENT: reading data length: 223 2023-06-15 14:19:24,768 DEBUG org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - Got Delegation token is HIVE_DELEGATION_TOKEN owner=xxxx/[email protected], renewer=xxxx, realUser=xxxx/[email protected], issueDate=1686809946183, maxDate=1687414746183, sequenceNumber=18, masterKeyId=27 2023-06-15 14:19:24,772 DEBUG org.apache.thrift.transport.TSaslTransport [] - writing data length: 225 2023-06-15 14:19:24,777 DEBUG org.apache.thrift.transport.TSaslTransport [] - CLIENT: reading data length: 46 2023-06-15 14:19:24,778 DEBUG org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - Renewal interval is 86400079 for token HIVE_DELEGATION_TOKEN 2023-06-15 14:19:24,779 DEBUG org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - Renewal date is 1686896346262 for token HIVE_DELEGATION_TOKEN ``` At the same time, I think this method is more reasonable than obtaining it from the configuration file, because this configuration item is a server-side configuration, and it is possible that there is no such configuration item in the configuration file, and the default value of the server is used. If we fail to obtain this configuration item on the client side, we also set a default value on the client side, then we must ensure that this value is consistent with the server side WDYT? @pvary -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
