This is an automated email from the ASF dual-hosted git repository.
bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 1e9d68b000 [KYUUBI #6368] Flink engine supports user impersonation
1e9d68b000 is described below
commit 1e9d68b000fd060c95439fce27f28f2110d371e8
Author: wforget <[email protected]>
AuthorDate: Mon Oct 21 17:32:39 2024 +0800
[KYUUBI #6368] Flink engine supports user impersonation
# :mag: Description
## Issue References ๐
This pull request fixes #6368
## Describe Your Solution ๐ง
Support impersonation mode for flink sql engine.
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [X] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
Test in hadoop-testing env.
Connection:
```
beeline -u
"jdbc:hive2://hadoop-master1.orb.local:10009/default;hive.server2.proxy.user=spark;principal=kyuubi/_HOSTTEST.ORG?kyuubi.engine.type=FLINK_SQL;flink.execution.target=yarn-application;kyuubi.engine.share.level=CONNECTION;kyuubi.engine.flink.doAs.enabled=true;"
```
sql:
```
select 1;
```
result:

launch engine command:
```
2024-06-12 03:22:10.242 INFO KyuubiSessionManager-exec-pool: Thread-62
org.apache.kyuubi.engine.EngineRef: Launching engine:
/opt/flink-1.18.1/bin/flink run-application \
-t yarn-application \
-Dyarn.ship-files=/opt/flink/opt/flink-sql-client-1.18.1.jar;/opt/flink/opt/flink-sql-gateway-1.18.1.jar;/etc/hive/conf/hive-site.xml
\
-Dyarn.application.name=kyuubi_CONNECTION_FLINK_SQL_spark_6170b9aa-c690-4b50-938f-d59cca9aa2d6
\
-Dyarn.tags=KYUUBI,6170b9aa-c690-4b50-938f-d59cca9aa2d6 \
-Dcontainerized.master.env.FLINK_CONF_DIR=. \
-Dcontainerized.master.env.HIVE_CONF_DIR=. \
-Dyarn.security.appmaster.delegation.token.services=kyuubi \
-Dsecurity.delegation.token.provider.HiveServer2.enabled=false \
-Dsecurity.delegation.token.provider.hbase.enabled=false \
-Dexecution.target=yarn-application \
-Dsecurity.module.factory.classes=org.apache.flink.runtime.security.modules.JaasModuleFactory;org.apache.flink.runtime.security.modules.ZookeeperModuleFa
ctory \
-Dsecurity.delegation.token.provider.hadoopfs.enabled=false \
-c org.apache.kyuubi.engine.flink.FlinkSQLEngine
/opt/apache-kyuubi-1.10.0-SNAPSHOT-bin/externals/engines/flink/kyuubi-flink-sql-engine_2.12-1.10.0-SNAPS
HOT.jar \
--conf kyuubi.session.user=spark \
--conf kyuubi.client.ipAddress=172.20.0.5 \
--conf
kyuubi.engine.credentials=SERUUwACJnRocmlmdDovL2hhZG9vcC1tYXN0ZXIxLm9yYi5sb2NhbDo5MDgzRQAFc3BhcmsEaGl2ZShreXV1YmkvaGFkb29wLW1hc3RlcjEub3JiLmxvY2Fs
QFRFU1QuT1JHigGQCneevIoBkC6EIrwWDxSg03pnAB8dA295wh+Dim7Fx4FNxhVISVZFX0RFTEVHQVRJT05fVE9LRU4ADzE3Mi4yMC4wLjU6ODAyMEEABXNwYXJrAChreXV1YmkvaGFkb29wLW1hc3RlcjEub3JiL
mxvY2FsQFRFU1QuT1JHigGQCneekIoBkC6EIpBHHBSket0SQnlXT5EIMN0U2fUKFRIVvBVIREZTX0RFTEVHQVRJT05fVE9LRU4PMTcyLjIwLjAuNTo4MDIwAA==
\
--conf kyuubi.engine.flink.doAs.enabled=true \
--conf
kyuubi.engine.hive.extra.classpath=/opt/hadoop/share/hadoop/client/*:/opt/hadoop/share/hadoop/mapreduce/*
\
--conf kyuubi.engine.share.level=CONNECTION \
--conf kyuubi.engine.submit.time=1718162530017 \
--conf kyuubi.engine.type=FLINK_SQL \
--conf kyuubi.frontend.protocols=THRIFT_BINARY,REST \
--conf kyuubi.ha.addresses=hadoop-master1.orb.local:2181 \
--conf kyuubi.ha.engine.ref.id=6170b9aa-c690-4b50-938f-d59cca9aa2d6
\
--conf
kyuubi.ha.namespace=/kyuubi_1.10.0-SNAPSHOT_CONNECTION_FLINK_SQL/spark/6170b9aa-c690-4b50-938f-d59cca9aa2d6
\
--conf kyuubi.server.ipAddress=172.20.0.5 \
--conf kyuubi.session.connection.url=hadoop-master1.orb.local:10009
\
--conf kyuubi.session.engine.startup.waitCompletion=false \
--conf kyuubi.session.real.user=spark
```
launch engine log:

jobmanager job:
```
2024-06-12 03:22:26,400 INFO
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager [] -
Loading delegation token providers
2024-06-12 03:22:26,992 INFO
org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenProvider []
- Renew delegation token with engine credentials:
SERUUwACJnRocmlmdDovL2hhZG9vcC1tYXN0ZXIxLm9yYi5sb2NhbDo5MDgzRQAFc3BhcmsEaGl2ZShreXV1YmkvaGFkb29wLW1hc3RlcjEub3JiLmxvY2FsQFRFU1QuT1JHigGQCneevIoBkC6EIrwWDxSg03pnAB8dA295wh+Dim7Fx4FNxhVISVZFX0RFTEVHQVRJT05fVE9LRU4ADzE3Mi4yMC4wLjU6ODAyMEEABXNwYXJrAChreXV1YmkvaGFkb29wLW1hc3RlcjEub3JiLmxvY2FsQFRFU1QuT1JHigGQCneekIoBkC6EIpBHHBSket0SQn
[...]
2024-06-12 03:22:27,100 INFO
org.apache.kyuubi.engine.flink.FlinkEngineUtils [] - Add new
unknown token Kind: HIVE_DELEGATION_TOKEN, Service: , Ident: 00 05 73 70 61 72
6b 04 68 69 76 65 28 6b 79 75 75 62 69 2f 68 61 64 6f 6f 70 2d 6d 61 73 74 65
72 31 2e 6f 72 62 2e 6c 6f 63 61 6c 40 54 45 53 54 2e 4f 52 47 8a 01 90 0a 77
9e bc 8a 01 90 2e 84 22 bc 16 0f
2024-06-12 03:22:27,104 WARN
org.apache.kyuubi.engine.flink.FlinkEngineUtils [] - Ignore token
with earlier issue date: Kind: HDFS_DELEGATION_TOKEN, Service: 172.20.0.5:8020,
Ident: (token for spark: HDFS_DELEGATION_TOKEN owner=spark, renewer=,
realUser=kyuubi/hadoop-master1.orb.localTEST.ORG, issueDate=1718162529936,
maxDate=1718767329936, sequenceNumber=71, masterKeyId=28)
2024-06-12 03:22:27,104 INFO
org.apache.kyuubi.engine.flink.FlinkEngineUtils [] - Update
delegation tokens. The number of tokens sent by the server is 2. The actual
number of updated tokens is 1.
......
4-06-12 03:22:29,414 INFO
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager [] -
Starting tokens update task
2024-06-12 03:22:29,415 INFO
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository [] -
New delegation tokens arrived, sending them to receivers
2024-06-12 03:22:29,422 INFO
org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver []
- Updating delegation tokens for current user
2024-06-12 03:22:29,422 INFO
org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver []
- Token Service: Identifier:[10, 13, 10, 9, 8, 10, 16, -78, -36, -49, -17, -5,
49, 16, 1, 16, -100, -112, -60, -127, -8, -1, -1, -1, -1, 1]
2024-06-12 03:22:29,422 INFO
org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver []
- Token Service: Identifier:[0, 5, 115, 112, 97, 114, 107, 4, 104, 105, 118,
101, 40, 107, 121, 117, 117, 98, 105, 47, 104, 97, 100, 111, 111, 112, 45, 109,
97, 115, 116, 101, 114, 49, 46, 111, 114, 98, 46, 108, 111, 99, 97, 108, 64,
84, 69, 83, 84, 46, 79, 82, 71, -118, 1, -112, 10, 119, -98, -68, -118, 1,
-112, 46, -124, 34, -68, 22, 15]
2024-06-12 03:22:29,422 INFO
org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver []
- Token Service:172.20.0.5:8020 Identifier:[0, 5, 115, 112, 97, 114, 107, 0,
40, 107, 121, 117, 117, 98, 105, 47, 104, 97, 100, 111, 111, 112, 45, 109, 97,
115, 116, 101, 114, 49, 46, 111, 114, 98, 46, 108, 111, 99, 97, 108, 64, 84,
69, 83, 84, 46, 79, 82, 71, -118, 1, -112, 10, 119, -98, -112, -118, 1, -112,
46, -124, 34, -112, 71, 28]
2024-06-12 03:22:29,422 INFO
org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver []
- Updated delegation tokens for current user successfully
```
taskmanager log:
```
2024-06-12 03:45:06,622 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Receive
initial delegation tokens from resource manager
2024-06-12 03:45:06,627 INFO
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository [] -
New delegation tokens arrived, sending them to receivers
2024-06-12 03:45:06,628 INFO
org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver []
- Updating delegation tokens for current user
2024-06-12 03:45:06,629 INFO
org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver []
- Token Service: Identifier:[10, 13, 10, 9, 8, 10, 16, -78, -36, -49, -17, -5,
49, 16, 1, 16, -100, -112, -60, -127, -8, -1, -1, -1, -1, 1]
2024-06-12 03:45:06,630 INFO
org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver []
- Token Service: Identifier:[0, 5, 115, 112, 97, 114, 107, 4, 104, 105, 118,
101, 40, 107, 121, 117, 117, 98, 105, 47, 104, 97, 100, 111, 111, 112, 45, 109,
97, 115, 116, 101, 114, 49, 46, 111, 114, 98, 46, 108, 111, 99, 97, 108, 64,
84, 69, 83, 84, 46, 79, 82, 71, -118, 1, -112, 10, 119, -98, -68, -118, 1,
-112, 46, -124, 34, -68, 22, 15]
2024-06-12 03:45:06,630 INFO
org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver []
- Token Service:172.20.0.5:8020 Identifier:[0, 5, 115, 112, 97, 114, 107, 0,
40, 107, 121, 117, 117, 98, 105, 47, 104, 97, 100, 111, 111, 112, 45, 109, 97,
115, 116, 101, 114, 49, 46, 111, 114, 98, 46, 108, 111, 99, 97, 108, 64, 84,
69, 83, 84, 46, 79, 82, 71, -118, 1, -112, 10, 119, -98, -112, -118, 1, -112,
46, -124, 34, -112, 71, 28]
2024-06-12 03:45:06,636 INFO
org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver []
- Updated delegation tokens for current user successfully
2024-06-12 03:45:06,636 INFO
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository [] -
Delegation tokens sent to receivers
```
#### Related Unit Tests
---
# Checklist ๐
- [X] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6383 from wForget/KYUUBI-6368.
Closes #6368
47df43ef0 [wforget] remove doAsEnabled
984b96c74 [wforget] update settings.md
c7f8d474e [wforget] make generateTokenFile conf to internal
8632176b1 [wforget] address comments
2ec270e8a [wforget] licenses
ed0e22f4e [wforget] separate kyuubi-flink-token-provider module
b66b855b6 [wforget] address comment
d4fc2bd1d [wforget] fix
1a3dc4643 [wforget] fix style
825e2a7a0 [wforget] address comments
a679ba1c2 [wforget] revert remove renewer
cdd499b95 [wforget] fix and comment
19caec6c0 [wforget] pass token to submit process
b2991d419 [wforget] fix
7c3bdde1b [wforget] remove security.delegation.tokens.enabled check
8987c9176 [wforget] fix
5bd8cfe7c [wforget] fix
08992642d [wforget] Implement KyuubiDelegationToken Provider/Receiver
fa16d7def [wforget] enable delegation token manager
e50db7497 [wforget] [KYUUBI #6368] Support impersonation mode for flink sql
engine
Authored-by: wforget <[email protected]>
Signed-off-by: Bowen Liang <[email protected]>
---
docs/configuration/settings.md | 1 +
.../flink/kyuubi-flink-token-provider/pom.xml | 78 +++++++++++
.../token/KyuubiDelegationTokenProvider.java | 97 +++++++++++++
.../token/KyuubiDelegationTokenReceiver.java | 28 ++++
.../flink/security/token/utils/KyuubiUtils.java | 153 +++++++++++++++++++++
...ink.core.security.token.DelegationTokenProvider | 16 +++
...ink.core.security.token.DelegationTokenReceiver | 16 +++
.../kyuubi/engine/flink/FlinkEngineUtils.scala | 31 ++++-
.../kyuubi/engine/flink/FlinkSQLEngine.scala | 2 +-
.../engine/flink/FlinkTBinaryFrontendService.scala | 37 +----
.../org/apache/kyuubi/config/KyuubiConf.scala | 20 +++
.../kyuubi/engine/flink/FlinkProcessBuilder.scala | 88 +++++++++++-
pom.xml | 1 +
13 files changed, 525 insertions(+), 43 deletions(-)
diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index d1d8cc08af..a1b714d0d7 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -149,6 +149,7 @@ You can configure the Kyuubi properties in
`$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.engine.event.json.log.path |
file:///tmp/kyuubi/events | The location where all the engine events go for the
built-in JSON logger.<ul><li>Local Path: start with 'file://'</li><li>HDFS
Path: start with 'hdfs://'</li></ul>
[...]
| kyuubi.engine.event.loggers | SPARK
| A comma-separated list of engine history loggers, where
engine/session/operation etc events go.<ul> <li>SPARK: the events will be
written to the Spark listener bus.</li> <li>JSON: the events will be written to
the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to be
done</li> <li>CUSTOM: User-defined event handlers.</li></ul> Note that: Kyuubi
supports custom event handlers with the Jav [...]
| kyuubi.engine.flink.application.jars | <undefined>
| A comma-separated list of the local jars to be shipped with the job
to the cluster. For example, SQL UDF jars. Only effective in yarn application
mode.
[...]
+| kyuubi.engine.flink.doAs.enabled | false
| When enabled, the session user is used as the proxy user to launch
the Flink engine, otherwise, the server user. Note, due to the limitation of
Apache Flink, it can only be enabled on Kerberized environment.
[...]
| kyuubi.engine.flink.extra.classpath | <undefined>
| The extra classpath for the Flink SQL engine, for configuring the
location of hadoop client jars, etc. Only effective in yarn session mode.
[...]
| kyuubi.engine.flink.initialize.sql | SHOW DATABASES
| The initialize sql for Flink engine. It fallback to
`kyuubi.engine.initialize.sql`.
[...]
| kyuubi.engine.flink.java.options | <undefined>
| The extra Java options for the Flink SQL engine. Only effective in
yarn session mode.
[...]
diff --git a/extensions/flink/kyuubi-flink-token-provider/pom.xml
b/extensions/flink/kyuubi-flink-token-provider/pom.xml
new file mode 100644
index 0000000000..61d93682ae
--- /dev/null
+++ b/extensions/flink/kyuubi-flink-token-provider/pom.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-parent</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ <relativePath>../../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>kyuubi-flink-token-provider</artifactId>
+ <packaging>jar</packaging>
+ <name>Kyuubi Flink Token Provider</name>
+ <url>https://kyuubi.apache.org/</url>
+
+ <dependencies>
+ <!-- flink -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- hadoop client -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-runtime</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/extensions/flink/kyuubi-flink-token-provider/src/main/java/org/apache/kyuubi/flink/security/token/KyuubiDelegationTokenProvider.java
b/extensions/flink/kyuubi-flink-token-provider/src/main/java/org/apache/kyuubi/flink/security/token/KyuubiDelegationTokenProvider.java
new file mode 100644
index 0000000000..8a054bc55e
--- /dev/null
+++
b/extensions/flink/kyuubi-flink-token-provider/src/main/java/org/apache/kyuubi/flink/security/token/KyuubiDelegationTokenProvider.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.flink.security.token;
+
+import static
org.apache.flink.client.deployment.application.ApplicationConfiguration.APPLICATION_ARGS;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.ConfigUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.security.token.DelegationTokenProvider;
+import
org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenConverter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.kyuubi.flink.security.token.utils.KyuubiUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KyuubiDelegationTokenProvider implements DelegationTokenProvider {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KyuubiDelegationTokenProvider.class);
+
+ public static volatile Map<Text, Token<? extends TokenIdentifier>>
previousTokens;
+
+ private long renewalInterval;
+
+ @Override
+ public void init(Configuration configuration) throws Exception {
+ final List<String> programArgsList =
+ ConfigUtils.decodeListFromConfig(configuration, APPLICATION_ARGS,
String::new);
+ Map<String, String> kyuubiConf =
KyuubiUtils.fromCommandLineArgs(programArgsList);
+ String engineCredentials =
+ kyuubiConf.getOrDefault(KyuubiUtils.KYUUBI_ENGINE_CREDENTIALS_KEY, "");
+ if (StringUtils.isNotBlank(engineCredentials)) {
+ LOG.info("Renew delegation token with engine credentials: {}",
engineCredentials);
+ KyuubiUtils.renewDelegationToken(engineCredentials);
+ }
+ Credentials credentials =
UserGroupInformation.getCurrentUser().getCredentials();
+ previousTokens = new HashMap<>(credentials.getTokenMap());
+ String interval =
kyuubiConf.get(KyuubiUtils.KYUUBI_CREDENTIALS_RENEWAL_INTERVAL_KEY);
+ if (StringUtils.isNotBlank(interval)) {
+ renewalInterval = Long.parseLong(interval);
+ } else {
+ renewalInterval =
KyuubiUtils.KYUUBI_CREDENTIALS_RENEWAL_INTERVAL_DEFAULT;
+ }
+ }
+
+ @Override
+ public ObtainedDelegationTokens obtainDelegationTokens() throws Exception {
+ Credentials credentials =
UserGroupInformation.getCurrentUser().getCredentials();
+ Credentials newCredentials = new Credentials();
+ for (Map.Entry<Text, Token<? extends TokenIdentifier>> tokenEntry :
+ credentials.getTokenMap().entrySet()) {
+ Text alias = tokenEntry.getKey();
+ Token<? extends TokenIdentifier> token = tokenEntry.getValue();
+ Token<? extends TokenIdentifier> previousToken =
previousTokens.get(alias);
+ if (previousToken == null || KyuubiUtils.compareIssueDate(token,
previousToken) > 0) {
+ newCredentials.addToken(alias, token);
+ }
+ }
+ previousTokens = new HashMap<>(credentials.getTokenMap());
+ Optional<Long> validUntil = Optional.of(System.currentTimeMillis() +
renewalInterval);
+ return new ObtainedDelegationTokens(
+ HadoopDelegationTokenConverter.serialize(credentials), validUntil);
+ }
+
+ @Override
+ public boolean delegationTokensRequired() throws Exception {
+ return true;
+ }
+
+ @Override
+ public String serviceName() {
+ return "kyuubi";
+ }
+}
diff --git
a/extensions/flink/kyuubi-flink-token-provider/src/main/java/org/apache/kyuubi/flink/security/token/KyuubiDelegationTokenReceiver.java
b/extensions/flink/kyuubi-flink-token-provider/src/main/java/org/apache/kyuubi/flink/security/token/KyuubiDelegationTokenReceiver.java
new file mode 100644
index 0000000000..192f67b75e
--- /dev/null
+++
b/extensions/flink/kyuubi-flink-token-provider/src/main/java/org/apache/kyuubi/flink/security/token/KyuubiDelegationTokenReceiver.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.flink.security.token;
+
+import
org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenReceiver;
+
+public class KyuubiDelegationTokenReceiver extends
HadoopDelegationTokenReceiver {
+
+ @Override
+ public String serviceName() {
+ return "kyuubi";
+ }
+}
diff --git
a/extensions/flink/kyuubi-flink-token-provider/src/main/java/org/apache/kyuubi/flink/security/token/utils/KyuubiUtils.java
b/extensions/flink/kyuubi-flink-token-provider/src/main/java/org/apache/kyuubi/flink/security/token/utils/KyuubiUtils.java
new file mode 100644
index 0000000000..1782b3d359
--- /dev/null
+++
b/extensions/flink/kyuubi-flink-token-provider/src/main/java/org/apache/kyuubi/flink/security/token/utils/KyuubiUtils.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.flink.security.token.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.*;
+import
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KyuubiUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KyuubiUtils.class);
+
+ public static final String KYUUBI_ENGINE_CREDENTIALS_KEY =
"kyuubi.engine.credentials";
+ public static final String KYUUBI_CREDENTIALS_RENEWAL_INTERVAL_KEY =
+ "kyuubi.credentials.renewal.interval";
+ public static final Long KYUUBI_CREDENTIALS_RENEWAL_INTERVAL_DEFAULT =
360000L;
+
+ // org.apache.kyuubi.Utils.fromCommandLineArgs
+ public static Map<String, String> fromCommandLineArgs(List<String> args) {
+ assert args.size() % 2 == 0 : "Illegal size of arguments.";
+
+ Map<String, String> conf = new HashMap<>();
+ for (int i = 0; i < args.size(); i++) {
+ String confKey = args.get(i);
+ String confValues = args.get(++i);
+ assert confKey.equals("--conf")
+ : "Unrecognized main arguments prefix "
+ + confKey
+ + ", the argument format is '--conf k=v'.";
+ String[] confItem = confValues.split("=", 2);
+ if (confItem.length == 2) {
+ conf.put(confItem[0].trim(), confItem[1].trim());
+ } else {
+ throw new IllegalArgumentException("Illegal argument: " + confValues +
".");
+ }
+ }
+
+ return conf;
+ }
+
+ public static void renewDelegationToken(String delegationToken)
+ throws NoSuchFieldException, IllegalAccessException, IOException {
+ Credentials newCreds = decodeCredentials(delegationToken);
+ Map<Text, Token<? extends TokenIdentifier>> newTokens =
getTokenMap(newCreds);
+
+ Credentials updateCreds = new Credentials();
+ Credentials oldCreds =
UserGroupInformation.getCurrentUser().getCredentials();
+ for (Map.Entry<Text, Token<? extends TokenIdentifier>> entry :
newTokens.entrySet()) {
+ Text alias = entry.getKey();
+ Token<? extends TokenIdentifier> newToken = entry.getValue();
+ Token<? extends TokenIdentifier> oldToken =
oldCreds.getToken(entry.getKey());
+ if (oldToken != null) {
+ if (compareIssueDate(newToken, oldToken) > 0) {
+ updateCreds.addToken(alias, newToken);
+ } else {
+ LOG.warn("Ignore token with earlier issue date: {}", newToken);
+ }
+ } else {
+ LOG.info("Add new unknown token {}", newToken);
+ updateCreds.addToken(alias, newToken);
+ }
+ }
+
+ if (updateCreds.numberOfTokens() > 0) {
+ LOG.info(
+ "Update delegation tokens. The number of tokens sent by the server
is {}. "
+ + "The actual number of updated tokens is {}.",
+ newCreds.numberOfTokens(),
+ updateCreds.numberOfTokens());
+ UserGroupInformation.getCurrentUser().addCredentials(updateCreds);
+ }
+ }
+
+ public static Credentials decodeCredentials(String newValue) throws
IOException {
+ byte[] decoded = Base64.getDecoder().decode(newValue);
+ ByteArrayInputStream byteStream = new ByteArrayInputStream(decoded);
+ Credentials creds = new Credentials();
+ creds.readTokenStorageStream(new DataInputStream(byteStream));
+ return creds;
+ }
+
+ /**
+ * Get [[Credentials#tokenMap]] by reflection as [[Credentials#getTokenMap]]
is not present before
+ * Hadoop 3.2.1.
+ */
+ public static Map<Text, Token<? extends TokenIdentifier>>
getTokenMap(Credentials credentials)
+ throws NoSuchFieldException, IllegalAccessException {
+ Field tokenMap = Credentials.class.getDeclaredField("tokenMap");
+ tokenMap.setAccessible(true);
+ return (Map<Text, Token<? extends TokenIdentifier>>)
tokenMap.get(credentials);
+ }
+
+ public static int compareIssueDate(
+ Token<? extends TokenIdentifier> newToken, Token<? extends
TokenIdentifier> oldToken)
+ throws IOException {
+ Optional<Long> newDate = getTokenIssueDate(newToken);
+ Optional<Long> oldDate = getTokenIssueDate(oldToken);
+ if (newDate.isPresent() && oldDate.isPresent() && newDate.get() <=
oldDate.get()) {
+ return -1;
+ } else {
+ return 1;
+ }
+ }
+
+ public static Optional<Long> getTokenIssueDate(Token<? extends
TokenIdentifier> token)
+ throws IOException {
+ TokenIdentifier identifier = token.decodeIdentifier();
+ if (identifier instanceof AbstractDelegationTokenIdentifier) {
+ return Optional.of(((AbstractDelegationTokenIdentifier)
identifier).getIssueDate());
+ }
+ if (identifier == null) {
+ // TokenIdentifiers not found in ServiceLoader
+ DelegationTokenIdentifier tokenIdentifier = new
DelegationTokenIdentifier();
+ ByteArrayInputStream buf = new
ByteArrayInputStream(token.getIdentifier());
+ DataInputStream in = new DataInputStream(buf);
+ try {
+ tokenIdentifier.readFields(in);
+ return Optional.of(tokenIdentifier.getIssueDate());
+ } catch (Exception e) {
+ LOG.warn("Can not decode identifier of token {}, error: {}", token, e);
+ return Optional.empty();
+ }
+ }
+ LOG.debug("Unsupported TokenIdentifier kind: {}", identifier.getKind());
+ return Optional.empty();
+ }
+}
diff --git
a/extensions/flink/kyuubi-flink-token-provider/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
b/extensions/flink/kyuubi-flink-token-provider/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
new file mode 100644
index 0000000000..28171e5ad0
--- /dev/null
+++
b/extensions/flink/kyuubi-flink-token-provider/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.kyuubi.flink.security.token.KyuubiDelegationTokenProvider
diff --git
a/extensions/flink/kyuubi-flink-token-provider/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
b/extensions/flink/kyuubi-flink-token-provider/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
new file mode 100644
index 0000000000..dc318d23d2
--- /dev/null
+++
b/extensions/flink/kyuubi-flink-token-provider/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.kyuubi.flink.security.token.KyuubiDelegationTokenReceiver
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
index 4293ef19f4..1f4e56920e 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
@@ -36,9 +36,10 @@ import
org.apache.flink.table.gateway.service.context.{DefaultContext, SessionCo
import org.apache.flink.table.gateway.service.result.ResultFetcher
import org.apache.flink.table.gateway.service.session.Session
import org.apache.flink.util.JarUtils
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.kyuubi.{KyuubiException, Logging}
-import org.apache.kyuubi.util.SemanticVersion
+import org.apache.kyuubi.util.{KyuubiHadoopUtils, SemanticVersion}
import org.apache.kyuubi.util.reflect.ReflectUtils._
object FlinkEngineUtils extends Logging {
@@ -166,4 +167,32 @@ object FlinkEngineUtils extends Logging {
}).toList
} else null
}
+
+ def renewDelegationToken(delegationToken: String): Unit = {
+ val newCreds = KyuubiHadoopUtils.decodeCredentials(delegationToken)
+ val newTokens = KyuubiHadoopUtils.getTokenMap(newCreds)
+
+ val updateCreds = new Credentials()
+ val oldCreds = UserGroupInformation.getCurrentUser.getCredentials
+ newTokens.foreach { case (alias, newToken) =>
+ val oldToken = oldCreds.getToken(alias)
+ if (oldToken != null) {
+ if (KyuubiHadoopUtils.compareIssueDate(newToken, oldToken) > 0) {
+ updateCreds.addToken(alias, newToken)
+ } else {
+ warn(s"Ignore token with earlier issue date: $newToken")
+ }
+ } else {
+ info(s"Add new unknown token $newToken")
+ updateCreds.addToken(alias, newToken)
+ }
+ }
+
+ if (updateCreds.numberOfTokens() > 0) {
+ info("Update delegation tokens. " +
+ s"The number of tokens sent by the server is
${newCreds.numberOfTokens()}. " +
+ s"The actual number of updated tokens is
${updateCreds.numberOfTokens()}.")
+ UserGroupInformation.getCurrentUser.addCredentials(updateCreds)
+ }
+ }
}
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
index 83149735fb..bc772ced8f 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
@@ -102,7 +102,7 @@ object FlinkSQLEngine extends Logging {
val engineCredentials =
kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
kyuubiConf.unset(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
engineCredentials.filter(_.nonEmpty).foreach { credentials =>
- FlinkTBinaryFrontendService.renewDelegationToken(credentials)
+ FlinkEngineUtils.renewDelegationToken(credentials)
}
val engineContext = FlinkEngineUtils.getDefaultContext(args, flinkConf,
flinkConfDir)
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkTBinaryFrontendService.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkTBinaryFrontendService.scala
index 79a6dff967..f2161599e4 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkTBinaryFrontendService.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkTBinaryFrontendService.scala
@@ -17,15 +17,12 @@
package org.apache.kyuubi.engine.flink
-import org.apache.hadoop.security.{Credentials, UserGroupInformation}
-
-import org.apache.kyuubi.{KyuubiSQLException, Logging}
-import
org.apache.kyuubi.engine.flink.FlinkTBinaryFrontendService.renewDelegationToken
+import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.engine.flink.FlinkEngineUtils.renewDelegationToken
import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, ServiceDiscovery}
import org.apache.kyuubi.service.{Serverable, Service, TBinaryFrontendService}
import org.apache.kyuubi.service.TFrontendService.OK_STATUS
import
org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TRenewDelegationTokenReq,
TRenewDelegationTokenResp}
-import org.apache.kyuubi.util.KyuubiHadoopUtils
class FlinkTBinaryFrontendService(
override val serverable: Serverable)
@@ -56,33 +53,3 @@ class FlinkTBinaryFrontendService(
}
}
-
-object FlinkTBinaryFrontendService extends Logging {
- private[flink] def renewDelegationToken(delegationToken: String): Unit = {
- val newCreds = KyuubiHadoopUtils.decodeCredentials(delegationToken)
- val newTokens = KyuubiHadoopUtils.getTokenMap(newCreds)
-
- val updateCreds = new Credentials()
- val oldCreds = UserGroupInformation.getCurrentUser.getCredentials
- newTokens.foreach { case (alias, newToken) =>
- val oldToken = oldCreds.getToken(alias)
- if (oldToken != null) {
- if (KyuubiHadoopUtils.compareIssueDate(newToken, oldToken) > 0) {
- updateCreds.addToken(alias, newToken)
- } else {
- warn(s"Ignore token with earlier issue date: $newToken")
- }
- } else {
- info(s"Add new unknown token $newToken")
- updateCreds.addToken(alias, newToken)
- }
- }
-
- if (updateCreds.numberOfTokens() > 0) {
- info("Update delegation tokens. " +
- s"The number of tokens sent by the server is
${newCreds.numberOfTokens()}. " +
- s"The actual number of updated tokens is
${updateCreds.numberOfTokens()}.")
- UserGroupInformation.getCurrentUser.addCredentials(updateCreds)
- }
- }
-}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 901703924e..436ac5fd16 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -3017,6 +3017,26 @@ object KyuubiConf {
.version("1.8.1")
.fallbackConf(ENGINE_INITIALIZE_SQL)
+ val ENGINE_FLINK_DOAS_ENABLED: ConfigEntry[Boolean] =
+ buildConf("kyuubi.engine.flink.doAs.enabled")
+ .doc("When enabled, the session user is used as the proxy user to launch
the Flink engine," +
+ " otherwise, the server user. Note, due to the limitation of Apache
Flink," +
+ " it can only be enabled on Kerberized environment.")
+ .version("1.10.0")
+ .booleanConf
+ .createWithDefault(false)
+
+ val ENGINE_FLINK_DOAS_GENERATE_TOKEN_FILE: ConfigEntry[Boolean] =
+ buildConf("kyuubi.engine.flink.doAs.generateTokenFile")
+ .internal
+ .doc(s"When ${ENGINE_FLINK_DOAS_ENABLED.key}=true and neither
FLINK-35525 (Flink 1.20.0)" +
+ " nor YARN-10333 (Hadoop 3.4.0) is available, enable this
configuration to generate a" +
+ " temporary HADOOP_TOKEN_FILE that will be picked up by the Flink
engine bootstrap" +
+ " process.")
+ .version("1.10.0")
+ .booleanConf
+ .createWithDefault(false)
+
val SERVER_LIMIT_CONNECTIONS_PER_USER: OptionalConfigEntry[Int] =
buildConf("kyuubi.server.limit.connections.per.user")
.doc("Maximum kyuubi server connections per user." +
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
index 9597c974f9..9f6e7e68c1 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
@@ -24,14 +24,18 @@ import scala.collection.mutable
import com.google.common.annotations.VisibleForTesting
import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.security.UserGroupInformation
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
-import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
+import
org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_CREDENTIALS_KEY,
KYUUBI_SESSION_USER_KEY}
import org.apache.kyuubi.engine.{ApplicationManagerInfo,
KyuubiApplicationManager, ProcBuilder}
import org.apache.kyuubi.engine.flink.FlinkProcessBuilder._
import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.util.KyuubiHadoopUtils
import org.apache.kyuubi.util.command.CommandLineUtils._
/**
@@ -59,14 +63,31 @@ class FlinkProcessBuilder(
// flink.execution.target are required in Kyuubi conf currently
val executionTarget: Option[String] =
conf.getOption("flink.execution.target")
+ private lazy val proxyUserEnable: Boolean = {
+ var flinkDoAsEnabled = conf.get(ENGINE_FLINK_DOAS_ENABLED)
+ if (flinkDoAsEnabled && !UserGroupInformation.isSecurityEnabled) {
+ warn(s"${ENGINE_FLINK_DOAS_ENABLED.key} can only be enabled on
Kerberized environment.")
+ flinkDoAsEnabled = false
+ }
+ flinkDoAsEnabled
+ }
+
override protected def module: String = "kyuubi-flink-sql-engine"
override protected def mainClass: String =
"org.apache.kyuubi.engine.flink.FlinkSQLEngine"
- override def env: Map[String, String] = conf.getEnvs +
- ("FLINK_CONF_DIR" -> conf.getEnvs.getOrElse(
- "FLINK_CONF_DIR",
- s"$flinkHome${File.separator}conf"))
+ override def env: Map[String, String] = {
+ val flinkConfDir =
+ conf.getEnvs.getOrElse("FLINK_CONF_DIR",
s"$flinkHome${File.separator}conf")
+ val flinkExtraEnvs = if (proxyUserEnable) {
+ Map(
+ "FLINK_CONF_DIR" -> flinkConfDir,
+ FLINK_PROXY_USER_KEY -> proxyUser) ++ generateTokenFile()
+ } else {
+ Map("FLINK_CONF_DIR" -> flinkConfDir)
+ }
+ conf.getEnvs ++ flinkExtraEnvs
+ }
override def clusterManager(): Option[String] = {
executionTarget match {
@@ -114,7 +135,29 @@ class FlinkProcessBuilder(
flinkExtraJars += s"$hiveConfFile"
}
- val customFlinkConf = conf.getAllWithPrefix(FLINK_CONF_PREFIX, "")
+ val externalProxyUserConf: Map[String, String] = if (proxyUserEnable) {
+ // FLINK-31109 (1.17.0): Flink only supports hadoop proxy user when
delegation tokens
+ // fetch is managed outside, but disabling
`security.delegation.tokens.enabled` will cause
+ // delegation token updates on JobManager not to be passed to
TaskManagers.
+ // Based on the solution in
+ //
https://github.com/apache/flink/pull/22009#issuecomment-2122226755, we removed
+ // `HadoopModuleFactory` from `security.module.factory.classes` and
disabled delegation
+ // token providers (hadoopfs/hbase/HiveServer2) that do not support
proxyUser.
+ // FLINK-35525: We need to add
`yarn.security.appmaster.delegation.token.services=kyuubi`
+ // configuration to pass hdfs token obtained by kyuubi provider to
the yarn client.
+ Map(
+ "security.module.factory.classes" ->
+ ("org.apache.flink.runtime.security.modules.JaasModuleFactory;" +
+
"org.apache.flink.runtime.security.modules.ZookeeperModuleFactory"),
+ "security.delegation.token.provider.hadoopfs.enabled" -> "false",
+ "security.delegation.token.provider.hbase.enabled" -> "false",
+ "security.delegation.token.provider.HiveServer2.enabled" ->
"false",
+ "yarn.security.appmaster.delegation.token.services" -> "kyuubi")
+ } else {
+ Map.empty
+ }
+
+ val customFlinkConf = conf.getAllWithPrefix(FLINK_CONF_PREFIX, "") ++
externalProxyUserConf
// add custom yarn.ship-files
flinkExtraJars ++= customFlinkConf.get(YARN_SHIP_FILES_KEY)
val yarnAppName = customFlinkConf.get(YARN_APPLICATION_NAME_KEY)
@@ -214,6 +257,37 @@ class FlinkProcessBuilder(
}
}
+ @volatile private var tokenTempDir: java.nio.file.Path = _
+ private def generateTokenFile(): Option[(String, String)] = {
+ if (conf.get(ENGINE_FLINK_DOAS_GENERATE_TOKEN_FILE)) {
+ // We disabled `hadoopfs` token service, which may cause yarn client to
miss hdfs token.
+ // So we generate a hadoop token file to pass kyuubi engine tokens to
submit process.
+ // TODO: Removed this after FLINK-35525 (1.20.0), delegation tokens will
be passed
+ // by `kyuubi` provider
+ conf.getOption(KYUUBI_ENGINE_CREDENTIALS_KEY).map { encodedCredentials =>
+ val credentials =
KyuubiHadoopUtils.decodeCredentials(encodedCredentials)
+ tokenTempDir = Utils.createTempDir()
+ val file =
s"${tokenTempDir.toString}/kyuubi_credentials_${System.currentTimeMillis()}"
+ credentials.writeTokenStorageFile(new Path(s"file://$file"), new
Configuration())
+ info(s"Generated hadoop token file: $file")
+ "HADOOP_TOKEN_FILE_LOCATION" -> file
+ }
+ } else {
+ None
+ }
+ }
+
+ override def close(destroyProcess: Boolean): Unit = {
+ super.close(destroyProcess)
+ if (tokenTempDir != null) {
+ try {
+ Utils.deleteDirectoryRecursively(tokenTempDir.toFile)
+ } catch {
+ case e: Throwable => error(s"Error deleting token temp dir:
$tokenTempDir", e)
+ }
+ }
+ }
+
override def shortName: String = "flink"
}
@@ -227,4 +301,6 @@ object FlinkProcessBuilder {
final val FLINK_HADOOP_CLASSPATH_KEY = "FLINK_HADOOP_CLASSPATH"
final val FLINK_PROXY_USER_KEY = "HADOOP_PROXY_USER"
+ final val FLINK_SECURITY_KEYTAB_KEY = "security.kerberos.login.keytab"
+ final val FLINK_SECURITY_PRINCIPAL_KEY = "security.kerberos.login.principal"
}
diff --git a/pom.xml b/pom.xml
index b2eafa76a8..18eb69ef12 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,6 +54,7 @@
<modules>
<module>dev/kyuubi-codecov</module>
<module>extensions/server/kyuubi-server-plugin</module>
+ <module>extensions/flink/kyuubi-flink-token-provider</module>
<module>extensions/spark/kyuubi-extension-spark-jdbc-dialect</module>
<module>extensions/spark/kyuubi-spark-authz</module>
<module>extensions/spark/kyuubi-spark-authz-shaded</module>