[
https://issues.apache.org/jira/browse/FLINK-22329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rui Li resolved FLINK-22329.
----------------------------
Fix Version/s: 1.14.0
Resolution: Fixed
Fixed in master: e461c7dd9615fa12fec8e158a9c201b17ab03ec3
> Missing credentials in jobconf causes repeated authentication in Hive
> datasource
> --------------------------------------------------------------------------------
>
> Key: FLINK-22329
> URL: https://issues.apache.org/jira/browse/FLINK-22329
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Hive
> Reporter: Junfan Zhang
> Assignee: Junfan Zhang
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.14.0
>
>
> Related Flink code:
> [https://github.com/apache/flink/blob/577113f0c339df844f2cc32b1d4a09d3da28085a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java#L107]
>
> In this {{getSplits}} method, it will call hadoop {{FileInputFormat's
> getSplits}} method. related hadoop code is
> [here|https://github.com/apache/hadoop/blob/03cfc852791c14fad39db4e5b14104a276c08e59/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#L426].
> Simple code is as follows
> {code:java}
> // Hadoop FileInputFormat
> public InputSplit[] getSplits(JobConf job, int numSplits)
> throws IOException {
> StopWatch sw = new StopWatch().start();
> FileStatus[] stats = listStatus(job);
>
> ......
> }
> protected FileStatus[] listStatus(JobConf job) throws IOException {
> Path[] dirs = getInputPaths(job);
> if (dirs.length == 0) {
> throw new IOException("No input paths specified in job");
> }
> // get tokens for all the required FileSystems..
> TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job);
>
> // Whether we need to recursive look into the directory structure
> ......
> }
> {code}
>
> In {{listStatus}} method, it will obtain delegation tokens by calling
> {{TokenCache.obtainTokensForNamenodes}} method. Howerver this method will
> give up to get delegation tokens when credentials in jobconf.
> So it's neccessary to inject current ugi credentials into jobconf.
>
> Besides, when Flink support delegation tokens directly without keytab([refer
> to this PR|https://issues.apache.org/jira/browse/FLINK-21700]),
> {{TokenCache.obtainTokensForNamenodes}} will failed without this patch
> because of no corresponding credentials.
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)