This is an automated email from the ASF dual-hosted git repository.

bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e9d68b000 [KYUUBI #6368] Flink engine supports user impersonation
1e9d68b000 is described below

commit 1e9d68b000fd060c95439fce27f28f2110d371e8
Author: wforget <[email protected]>
AuthorDate: Mon Oct 21 17:32:39 2024 +0800

    [KYUUBI #6368] Flink engine supports user impersonation
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    This pull request fixes #6368
    
    ## Describe Your Solution ๐Ÿ”ง
    
    Support impersonation mode for flink sql engine.
    
    ## Types of changes :bookmark:
    
    - [ ] Bugfix (non-breaking change which fixes an issue)
    - [X] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    Test in hadoop-testing env.
    
    Connection:
    
    ```
    beeline -u 
"jdbc:hive2://hadoop-master1.orb.local:10009/default;hive.server2.proxy.user=spark;principal=kyuubi/_HOSTTEST.ORG?kyuubi.engine.type=FLINK_SQL;flink.execution.target=yarn-application;kyuubi.engine.share.level=CONNECTION;kyuubi.engine.flink.doAs.enabled=true;"
    ```
    
    sql:
    
    ```
    select 1;
    ```
    
    result:
    
    
![image](https://github.com/apache/kyuubi/assets/17894939/4bde3e4e-0dac-4e09-ac7c-a2c3a3607a13)
    
    launch engine command:
    
    ```
    2024-06-12 03:22:10.242 INFO KyuubiSessionManager-exec-pool: Thread-62 
org.apache.kyuubi.engine.EngineRef: Launching engine:
    /opt/flink-1.18.1/bin/flink run-application \
            -t yarn-application \
            
-Dyarn.ship-files=/opt/flink/opt/flink-sql-client-1.18.1.jar;/opt/flink/opt/flink-sql-gateway-1.18.1.jar;/etc/hive/conf/hive-site.xml
 \
            
-Dyarn.application.name=kyuubi_CONNECTION_FLINK_SQL_spark_6170b9aa-c690-4b50-938f-d59cca9aa2d6
 \
            -Dyarn.tags=KYUUBI,6170b9aa-c690-4b50-938f-d59cca9aa2d6 \
            -Dcontainerized.master.env.FLINK_CONF_DIR=. \
            -Dcontainerized.master.env.HIVE_CONF_DIR=. \
            -Dyarn.security.appmaster.delegation.token.services=kyuubi \
            -Dsecurity.delegation.token.provider.HiveServer2.enabled=false \
            -Dsecurity.delegation.token.provider.hbase.enabled=false \
            -Dexecution.target=yarn-application \
            
-Dsecurity.module.factory.classes=org.apache.flink.runtime.security.modules.JaasModuleFactory;org.apache.flink.runtime.security.modules.ZookeeperModuleFa
    ctory \
            -Dsecurity.delegation.token.provider.hadoopfs.enabled=false \
            -c org.apache.kyuubi.engine.flink.FlinkSQLEngine 
/opt/apache-kyuubi-1.10.0-SNAPSHOT-bin/externals/engines/flink/kyuubi-flink-sql-engine_2.12-1.10.0-SNAPS
    HOT.jar \
            --conf kyuubi.session.user=spark \
            --conf kyuubi.client.ipAddress=172.20.0.5 \
            --conf 
kyuubi.engine.credentials=SERUUwACJnRocmlmdDovL2hhZG9vcC1tYXN0ZXIxLm9yYi5sb2NhbDo5MDgzRQAFc3BhcmsEaGl2ZShreXV1YmkvaGFkb29wLW1hc3RlcjEub3JiLmxvY2Fs
    
QFRFU1QuT1JHigGQCneevIoBkC6EIrwWDxSg03pnAB8dA295wh+Dim7Fx4FNxhVISVZFX0RFTEVHQVRJT05fVE9LRU4ADzE3Mi4yMC4wLjU6ODAyMEEABXNwYXJrAChreXV1YmkvaGFkb29wLW1hc3RlcjEub3JiL
    
mxvY2FsQFRFU1QuT1JHigGQCneekIoBkC6EIpBHHBSket0SQnlXT5EIMN0U2fUKFRIVvBVIREZTX0RFTEVHQVRJT05fVE9LRU4PMTcyLjIwLjAuNTo4MDIwAA==
 \
            --conf kyuubi.engine.flink.doAs.enabled=true \
            --conf 
kyuubi.engine.hive.extra.classpath=/opt/hadoop/share/hadoop/client/*:/opt/hadoop/share/hadoop/mapreduce/*
 \
            --conf kyuubi.engine.share.level=CONNECTION \
            --conf kyuubi.engine.submit.time=1718162530017 \
            --conf kyuubi.engine.type=FLINK_SQL \
            --conf kyuubi.frontend.protocols=THRIFT_BINARY,REST \
            --conf kyuubi.ha.addresses=hadoop-master1.orb.local:2181 \
            --conf kyuubi.ha.engine.ref.id=6170b9aa-c690-4b50-938f-d59cca9aa2d6 
\
            --conf 
kyuubi.ha.namespace=/kyuubi_1.10.0-SNAPSHOT_CONNECTION_FLINK_SQL/spark/6170b9aa-c690-4b50-938f-d59cca9aa2d6
 \
            --conf kyuubi.server.ipAddress=172.20.0.5 \
            --conf kyuubi.session.connection.url=hadoop-master1.orb.local:10009 
\
            --conf kyuubi.session.engine.startup.waitCompletion=false \
            --conf kyuubi.session.real.user=spark
    ```
    
    launch engine log:
    
    
![image](https://github.com/apache/kyuubi/assets/17894939/590463a8-2858-47a2-8897-0ddfbe3ffdf6)
    
    jobmanager job:
    
    ```
    2024-06-12 03:22:26,400 INFO  
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager [] - 
Loading delegation token providers
    2024-06-12 03:22:26,992 INFO  
org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenProvider [] 
- Renew delegation token with engine credentials: 
SERUUwACJnRocmlmdDovL2hhZG9vcC1tYXN0ZXIxLm9yYi5sb2NhbDo5MDgzRQAFc3BhcmsEaGl2ZShreXV1YmkvaGFkb29wLW1hc3RlcjEub3JiLmxvY2FsQFRFU1QuT1JHigGQCneevIoBkC6EIrwWDxSg03pnAB8dA295wh+Dim7Fx4FNxhVISVZFX0RFTEVHQVRJT05fVE9LRU4ADzE3Mi4yMC4wLjU6ODAyMEEABXNwYXJrAChreXV1YmkvaGFkb29wLW1hc3RlcjEub3JiLmxvY2FsQFRFU1QuT1JHigGQCneekIoBkC6EIpBHHBSket0SQn
 [...]
    2024-06-12 03:22:27,100 INFO  
org.apache.kyuubi.engine.flink.FlinkEngineUtils              [] - Add new 
unknown token Kind: HIVE_DELEGATION_TOKEN, Service: , Ident: 00 05 73 70 61 72 
6b 04 68 69 76 65 28 6b 79 75 75 62 69 2f 68 61 64 6f 6f 70 2d 6d 61 73 74 65 
72 31 2e 6f 72 62 2e 6c 6f 63 61 6c 40 54 45 53 54 2e 4f 52 47 8a 01 90 0a 77 
9e bc 8a 01 90 2e 84 22 bc 16 0f
    2024-06-12 03:22:27,104 WARN  
org.apache.kyuubi.engine.flink.FlinkEngineUtils              [] - Ignore token 
with earlier issue date: Kind: HDFS_DELEGATION_TOKEN, Service: 172.20.0.5:8020, 
Ident: (token for spark: HDFS_DELEGATION_TOKEN owner=spark, renewer=, 
realUser=kyuubi/hadoop-master1.orb.localTEST.ORG, issueDate=1718162529936, 
maxDate=1718767329936, sequenceNumber=71, masterKeyId=28)
    2024-06-12 03:22:27,104 INFO  
org.apache.kyuubi.engine.flink.FlinkEngineUtils              [] - Update 
delegation tokens. The number of tokens sent by the server is 2. The actual 
number of updated tokens is 1.
    ......
    4-06-12 03:22:29,414 INFO  
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager [] - 
Starting tokens update task
    2024-06-12 03:22:29,415 INFO  
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository [] - 
New delegation tokens arrived, sending them to receivers
    2024-06-12 03:22:29,422 INFO  
org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver [] 
- Updating delegation tokens for current user
    2024-06-12 03:22:29,422 INFO  
org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver [] 
- Token Service: Identifier:[10, 13, 10, 9, 8, 10, 16, -78, -36, -49, -17, -5, 
49, 16, 1, 16, -100, -112, -60, -127, -8, -1, -1, -1, -1, 1]
    2024-06-12 03:22:29,422 INFO  
org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver [] 
- Token Service: Identifier:[0, 5, 115, 112, 97, 114, 107, 4, 104, 105, 118, 
101, 40, 107, 121, 117, 117, 98, 105, 47, 104, 97, 100, 111, 111, 112, 45, 109, 
97, 115, 116, 101, 114, 49, 46, 111, 114, 98, 46, 108, 111, 99, 97, 108, 64, 
84, 69, 83, 84, 46, 79, 82, 71, -118, 1, -112, 10, 119, -98, -68, -118, 1, 
-112, 46, -124, 34, -68, 22, 15]
    2024-06-12 03:22:29,422 INFO  
org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver [] 
- Token Service:172.20.0.5:8020 Identifier:[0, 5, 115, 112, 97, 114, 107, 0, 
40, 107, 121, 117, 117, 98, 105, 47, 104, 97, 100, 111, 111, 112, 45, 109, 97, 
115, 116, 101, 114, 49, 46, 111, 114, 98, 46, 108, 111, 99, 97, 108, 64, 84, 
69, 83, 84, 46, 79, 82, 71, -118, 1, -112, 10, 119, -98, -112, -118, 1, -112, 
46, -124, 34, -112, 71, 28]
    2024-06-12 03:22:29,422 INFO  
org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver [] 
- Updated delegation tokens for current user successfully
    
    ```
    
    taskmanager log:
    
    ```
    2024-06-12 03:45:06,622 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Receive 
initial delegation tokens from resource manager
    2024-06-12 03:45:06,627 INFO  
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository [] - 
New delegation tokens arrived, sending them to receivers
    2024-06-12 03:45:06,628 INFO  
org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver [] 
- Updating delegation tokens for current user
    2024-06-12 03:45:06,629 INFO  
org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver [] 
- Token Service: Identifier:[10, 13, 10, 9, 8, 10, 16, -78, -36, -49, -17, -5, 
49, 16, 1, 16, -100, -112, -60, -127, -8, -1, -1, -1, -1, 1]
    2024-06-12 03:45:06,630 INFO  
org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver [] 
- Token Service: Identifier:[0, 5, 115, 112, 97, 114, 107, 4, 104, 105, 118, 
101, 40, 107, 121, 117, 117, 98, 105, 47, 104, 97, 100, 111, 111, 112, 45, 109, 
97, 115, 116, 101, 114, 49, 46, 111, 114, 98, 46, 108, 111, 99, 97, 108, 64, 
84, 69, 83, 84, 46, 79, 82, 71, -118, 1, -112, 10, 119, -98, -68, -118, 1, 
-112, 46, -124, 34, -68, 22, 15]
    2024-06-12 03:45:06,630 INFO  
org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver [] 
- Token Service:172.20.0.5:8020 Identifier:[0, 5, 115, 112, 97, 114, 107, 0, 
40, 107, 121, 117, 117, 98, 105, 47, 104, 97, 100, 111, 111, 112, 45, 109, 97, 
115, 116, 101, 114, 49, 46, 111, 114, 98, 46, 108, 111, 99, 97, 108, 64, 84, 
69, 83, 84, 46, 79, 82, 71, -118, 1, -112, 10, 119, -98, -112, -118, 1, -112, 
46, -124, 34, -112, 71, 28]
    2024-06-12 03:45:06,636 INFO  
org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver [] 
- Updated delegation tokens for current user successfully
    2024-06-12 03:45:06,636 INFO  
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository [] - 
Delegation tokens sent to receivers
    ```
    
    #### Related Unit Tests
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [X] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6383 from wForget/KYUUBI-6368.
    
    Closes #6368
    
    47df43ef0 [wforget] remove doAsEnabled
    984b96c74 [wforget] update settings.md
    c7f8d474e [wforget] make generateTokenFile conf to internal
    8632176b1 [wforget] address comments
    2ec270e8a [wforget] licenses
    ed0e22f4e [wforget] separate kyuubi-flink-token-provider module
    b66b855b6 [wforget] address comment
    d4fc2bd1d [wforget] fix
    1a3dc4643 [wforget] fix style
    825e2a7a0 [wforget] address comments
    a679ba1c2 [wforget] revert remove renewer
    cdd499b95 [wforget] fix and comment
    19caec6c0 [wforget] pass token to submit process
    b2991d419 [wforget] fix
    7c3bdde1b [wforget] remove security.delegation.tokens.enabled check
    8987c9176 [wforget] fix
    5bd8cfe7c [wforget] fix
    08992642d [wforget] Implement KyuubiDelegationToken Provider/Receiver
    fa16d7def [wforget] enable delegation token manager
    e50db7497 [wforget] [KYUUBI #6368] Support impersonation mode for flink sql 
engine
    
    Authored-by: wforget <[email protected]>
    Signed-off-by: Bowen Liang <[email protected]>
---
 docs/configuration/settings.md                     |   1 +
 .../flink/kyuubi-flink-token-provider/pom.xml      |  78 +++++++++++
 .../token/KyuubiDelegationTokenProvider.java       |  97 +++++++++++++
 .../token/KyuubiDelegationTokenReceiver.java       |  28 ++++
 .../flink/security/token/utils/KyuubiUtils.java    | 153 +++++++++++++++++++++
 ...ink.core.security.token.DelegationTokenProvider |  16 +++
 ...ink.core.security.token.DelegationTokenReceiver |  16 +++
 .../kyuubi/engine/flink/FlinkEngineUtils.scala     |  31 ++++-
 .../kyuubi/engine/flink/FlinkSQLEngine.scala       |   2 +-
 .../engine/flink/FlinkTBinaryFrontendService.scala |  37 +----
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  20 +++
 .../kyuubi/engine/flink/FlinkProcessBuilder.scala  |  88 +++++++++++-
 pom.xml                                            |   1 +
 13 files changed, 525 insertions(+), 43 deletions(-)

diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index d1d8cc08af..a1b714d0d7 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -149,6 +149,7 @@ You can configure the Kyuubi properties in 
`$KYUUBI_HOME/conf/kyuubi-defaults.co
 | kyuubi.engine.event.json.log.path                        | 
file:///tmp/kyuubi/events | The location where all the engine events go for the 
built-in JSON logger.<ul><li>Local Path: start with 'file://'</li><li>HDFS 
Path: start with 'hdfs://'</li></ul>                                            
                                                                                
                                                                                
                                     [...]
 | kyuubi.engine.event.loggers                              | SPARK             
        | A comma-separated list of engine history loggers, where 
engine/session/operation etc events go.<ul> <li>SPARK: the events will be 
written to the Spark listener bus.</li> <li>JSON: the events will be written to 
the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to be 
done</li> <li>CUSTOM: User-defined event handlers.</li></ul> Note that: Kyuubi 
supports custom event handlers with the Jav [...]
 | kyuubi.engine.flink.application.jars                     | &lt;undefined&gt; 
        | A comma-separated list of the local jars to be shipped with the job 
to the cluster. For example, SQL UDF jars. Only effective in yarn application 
mode.                                                                           
                                                                                
                                                                                
                  [...]
+| kyuubi.engine.flink.doAs.enabled                         | false             
        | When enabled, the session user is used as the proxy user to launch 
the Flink engine, otherwise, the server user. Note, due to the limitation of 
Apache Flink, it can only be enabled on Kerberized environment.                 
                                                                                
                                                                                
                    [...]
 | kyuubi.engine.flink.extra.classpath                      | &lt;undefined&gt; 
        | The extra classpath for the Flink SQL engine, for configuring the 
location of hadoop client jars, etc. Only effective in yarn session mode.       
                                                                                
                                                                                
                                                                                
                  [...]
 | kyuubi.engine.flink.initialize.sql                       | SHOW DATABASES    
        | The initialize sql for Flink engine. It fallback to 
`kyuubi.engine.initialize.sql`.                                                 
                                                                                
                                                                                
                                                                                
                                [...]
 | kyuubi.engine.flink.java.options                         | &lt;undefined&gt; 
        | The extra Java options for the Flink SQL engine. Only effective in 
yarn session mode.                                                              
                                                                                
                                                                                
                                                                                
                 [...]
diff --git a/extensions/flink/kyuubi-flink-token-provider/pom.xml 
b/extensions/flink/kyuubi-flink-token-provider/pom.xml
new file mode 100644
index 0000000000..61d93682ae
--- /dev/null
+++ b/extensions/flink/kyuubi-flink-token-provider/pom.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.kyuubi</groupId>
+        <artifactId>kyuubi-parent</artifactId>
+        <version>1.10.0-SNAPSHOT</version>
+        <relativePath>../../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>kyuubi-flink-token-provider</artifactId>
+    <packaging>jar</packaging>
+    <name>Kyuubi Flink Token Provider</name>
+    <url>https://kyuubi.apache.org/</url>
+
+    <dependencies>
+        <!-- flink -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- hadoop client -->
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client-runtime</artifactId>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/extensions/flink/kyuubi-flink-token-provider/src/main/java/org/apache/kyuubi/flink/security/token/KyuubiDelegationTokenProvider.java
 
b/extensions/flink/kyuubi-flink-token-provider/src/main/java/org/apache/kyuubi/flink/security/token/KyuubiDelegationTokenProvider.java
new file mode 100644
index 0000000000..8a054bc55e
--- /dev/null
+++ 
b/extensions/flink/kyuubi-flink-token-provider/src/main/java/org/apache/kyuubi/flink/security/token/KyuubiDelegationTokenProvider.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.flink.security.token;
+
+import static 
org.apache.flink.client.deployment.application.ApplicationConfiguration.APPLICATION_ARGS;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.ConfigUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.security.token.DelegationTokenProvider;
+import 
org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenConverter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.kyuubi.flink.security.token.utils.KyuubiUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KyuubiDelegationTokenProvider implements DelegationTokenProvider {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(KyuubiDelegationTokenProvider.class);
+
+  public static volatile Map<Text, Token<? extends TokenIdentifier>> 
previousTokens;
+
+  private long renewalInterval;
+
+  @Override
+  public void init(Configuration configuration) throws Exception {
+    final List<String> programArgsList =
+        ConfigUtils.decodeListFromConfig(configuration, APPLICATION_ARGS, 
String::new);
+    Map<String, String> kyuubiConf = 
KyuubiUtils.fromCommandLineArgs(programArgsList);
+    String engineCredentials =
+        kyuubiConf.getOrDefault(KyuubiUtils.KYUUBI_ENGINE_CREDENTIALS_KEY, "");
+    if (StringUtils.isNotBlank(engineCredentials)) {
+      LOG.info("Renew delegation token with engine credentials: {}", 
engineCredentials);
+      KyuubiUtils.renewDelegationToken(engineCredentials);
+    }
+    Credentials credentials = 
UserGroupInformation.getCurrentUser().getCredentials();
+    previousTokens = new HashMap<>(credentials.getTokenMap());
+    String interval = 
kyuubiConf.get(KyuubiUtils.KYUUBI_CREDENTIALS_RENEWAL_INTERVAL_KEY);
+    if (StringUtils.isNotBlank(interval)) {
+      renewalInterval = Long.parseLong(interval);
+    } else {
+      renewalInterval = 
KyuubiUtils.KYUUBI_CREDENTIALS_RENEWAL_INTERVAL_DEFAULT;
+    }
+  }
+
+  @Override
+  public ObtainedDelegationTokens obtainDelegationTokens() throws Exception {
+    Credentials credentials = 
UserGroupInformation.getCurrentUser().getCredentials();
+    Credentials newCredentials = new Credentials();
+    for (Map.Entry<Text, Token<? extends TokenIdentifier>> tokenEntry :
+        credentials.getTokenMap().entrySet()) {
+      Text alias = tokenEntry.getKey();
+      Token<? extends TokenIdentifier> token = tokenEntry.getValue();
+      Token<? extends TokenIdentifier> previousToken = 
previousTokens.get(alias);
+      if (previousToken == null || KyuubiUtils.compareIssueDate(token, 
previousToken) > 0) {
+        newCredentials.addToken(alias, token);
+      }
+    }
+    previousTokens = new HashMap<>(credentials.getTokenMap());
+    Optional<Long> validUntil = Optional.of(System.currentTimeMillis() + 
renewalInterval);
+    return new ObtainedDelegationTokens(
+        HadoopDelegationTokenConverter.serialize(credentials), validUntil);
+  }
+
+  @Override
+  public boolean delegationTokensRequired() throws Exception {
+    return true;
+  }
+
+  @Override
+  public String serviceName() {
+    return "kyuubi";
+  }
+}
diff --git 
a/extensions/flink/kyuubi-flink-token-provider/src/main/java/org/apache/kyuubi/flink/security/token/KyuubiDelegationTokenReceiver.java
 
b/extensions/flink/kyuubi-flink-token-provider/src/main/java/org/apache/kyuubi/flink/security/token/KyuubiDelegationTokenReceiver.java
new file mode 100644
index 0000000000..192f67b75e
--- /dev/null
+++ 
b/extensions/flink/kyuubi-flink-token-provider/src/main/java/org/apache/kyuubi/flink/security/token/KyuubiDelegationTokenReceiver.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.flink.security.token;
+
+import 
org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenReceiver;
+
+public class KyuubiDelegationTokenReceiver extends 
HadoopDelegationTokenReceiver {
+
+  @Override
+  public String serviceName() {
+    return "kyuubi";
+  }
+}
diff --git 
a/extensions/flink/kyuubi-flink-token-provider/src/main/java/org/apache/kyuubi/flink/security/token/utils/KyuubiUtils.java
 
b/extensions/flink/kyuubi-flink-token-provider/src/main/java/org/apache/kyuubi/flink/security/token/utils/KyuubiUtils.java
new file mode 100644
index 0000000000..1782b3d359
--- /dev/null
+++ 
b/extensions/flink/kyuubi-flink-token-provider/src/main/java/org/apache/kyuubi/flink/security/token/utils/KyuubiUtils.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.flink.security.token.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.*;
+import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KyuubiUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KyuubiUtils.class);
+
+  public static final String KYUUBI_ENGINE_CREDENTIALS_KEY = 
"kyuubi.engine.credentials";
+  public static final String KYUUBI_CREDENTIALS_RENEWAL_INTERVAL_KEY =
+      "kyuubi.credentials.renewal.interval";
+  public static final Long KYUUBI_CREDENTIALS_RENEWAL_INTERVAL_DEFAULT = 
360000L;
+
+  // org.apache.kyuubi.Utils.fromCommandLineArgs
+  public static Map<String, String> fromCommandLineArgs(List<String> args) {
+    assert args.size() % 2 == 0 : "Illegal size of arguments.";
+
+    Map<String, String> conf = new HashMap<>();
+    for (int i = 0; i < args.size(); i++) {
+      String confKey = args.get(i);
+      String confValues = args.get(++i);
+      assert confKey.equals("--conf")
+          : "Unrecognized main arguments prefix "
+              + confKey
+              + ", the argument format is '--conf k=v'.";
+      String[] confItem = confValues.split("=", 2);
+      if (confItem.length == 2) {
+        conf.put(confItem[0].trim(), confItem[1].trim());
+      } else {
+        throw new IllegalArgumentException("Illegal argument: " + confValues + 
".");
+      }
+    }
+
+    return conf;
+  }
+
+  public static void renewDelegationToken(String delegationToken)
+      throws NoSuchFieldException, IllegalAccessException, IOException {
+    Credentials newCreds = decodeCredentials(delegationToken);
+    Map<Text, Token<? extends TokenIdentifier>> newTokens = 
getTokenMap(newCreds);
+
+    Credentials updateCreds = new Credentials();
+    Credentials oldCreds = 
UserGroupInformation.getCurrentUser().getCredentials();
+    for (Map.Entry<Text, Token<? extends TokenIdentifier>> entry : 
newTokens.entrySet()) {
+      Text alias = entry.getKey();
+      Token<? extends TokenIdentifier> newToken = entry.getValue();
+      Token<? extends TokenIdentifier> oldToken = 
oldCreds.getToken(entry.getKey());
+      if (oldToken != null) {
+        if (compareIssueDate(newToken, oldToken) > 0) {
+          updateCreds.addToken(alias, newToken);
+        } else {
+          LOG.warn("Ignore token with earlier issue date: {}", newToken);
+        }
+      } else {
+        LOG.info("Add new unknown token {}", newToken);
+        updateCreds.addToken(alias, newToken);
+      }
+    }
+
+    if (updateCreds.numberOfTokens() > 0) {
+      LOG.info(
+          "Update delegation tokens. The number of tokens sent by the server 
is {}. "
+              + "The actual number of updated tokens is {}.",
+          newCreds.numberOfTokens(),
+          updateCreds.numberOfTokens());
+      UserGroupInformation.getCurrentUser().addCredentials(updateCreds);
+    }
+  }
+
+  public static Credentials decodeCredentials(String newValue) throws 
IOException {
+    byte[] decoded = Base64.getDecoder().decode(newValue);
+    ByteArrayInputStream byteStream = new ByteArrayInputStream(decoded);
+    Credentials creds = new Credentials();
+    creds.readTokenStorageStream(new DataInputStream(byteStream));
+    return creds;
+  }
+
+  /**
+   * Get [[Credentials#tokenMap]] by reflection as [[Credentials#getTokenMap]] 
is not present before
+   * Hadoop 3.2.1.
+   */
+  public static Map<Text, Token<? extends TokenIdentifier>> 
getTokenMap(Credentials credentials)
+      throws NoSuchFieldException, IllegalAccessException {
+    Field tokenMap = Credentials.class.getDeclaredField("tokenMap");
+    tokenMap.setAccessible(true);
+    return (Map<Text, Token<? extends TokenIdentifier>>) 
tokenMap.get(credentials);
+  }
+
+  public static int compareIssueDate(
+      Token<? extends TokenIdentifier> newToken, Token<? extends 
TokenIdentifier> oldToken)
+      throws IOException {
+    Optional<Long> newDate = getTokenIssueDate(newToken);
+    Optional<Long> oldDate = getTokenIssueDate(oldToken);
+    if (newDate.isPresent() && oldDate.isPresent() && newDate.get() <= 
oldDate.get()) {
+      return -1;
+    } else {
+      return 1;
+    }
+  }
+
+  public static Optional<Long> getTokenIssueDate(Token<? extends 
TokenIdentifier> token)
+      throws IOException {
+    TokenIdentifier identifier = token.decodeIdentifier();
+    if (identifier instanceof AbstractDelegationTokenIdentifier) {
+      return Optional.of(((AbstractDelegationTokenIdentifier) 
identifier).getIssueDate());
+    }
+    if (identifier == null) {
+      // TokenIdentifiers not found in ServiceLoader
+      DelegationTokenIdentifier tokenIdentifier = new 
DelegationTokenIdentifier();
+      ByteArrayInputStream buf = new 
ByteArrayInputStream(token.getIdentifier());
+      DataInputStream in = new DataInputStream(buf);
+      try {
+        tokenIdentifier.readFields(in);
+        return Optional.of(tokenIdentifier.getIssueDate());
+      } catch (Exception e) {
+        LOG.warn("Can not decode identifier of token {}, error: {}", token, e);
+        return Optional.empty();
+      }
+    }
+    LOG.debug("Unsupported TokenIdentifier kind: {}", identifier.getKind());
+    return Optional.empty();
+  }
+}
diff --git 
a/extensions/flink/kyuubi-flink-token-provider/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
 
b/extensions/flink/kyuubi-flink-token-provider/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
new file mode 100644
index 0000000000..28171e5ad0
--- /dev/null
+++ 
b/extensions/flink/kyuubi-flink-token-provider/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.kyuubi.flink.security.token.KyuubiDelegationTokenProvider
diff --git 
a/extensions/flink/kyuubi-flink-token-provider/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
 
b/extensions/flink/kyuubi-flink-token-provider/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
new file mode 100644
index 0000000000..dc318d23d2
--- /dev/null
+++ 
b/extensions/flink/kyuubi-flink-token-provider/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.kyuubi.flink.security.token.KyuubiDelegationTokenReceiver
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
index 4293ef19f4..1f4e56920e 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
@@ -36,9 +36,10 @@ import 
org.apache.flink.table.gateway.service.context.{DefaultContext, SessionCo
 import org.apache.flink.table.gateway.service.result.ResultFetcher
 import org.apache.flink.table.gateway.service.session.Session
 import org.apache.flink.util.JarUtils
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 
 import org.apache.kyuubi.{KyuubiException, Logging}
-import org.apache.kyuubi.util.SemanticVersion
+import org.apache.kyuubi.util.{KyuubiHadoopUtils, SemanticVersion}
 import org.apache.kyuubi.util.reflect.ReflectUtils._
 
 object FlinkEngineUtils extends Logging {
@@ -166,4 +167,32 @@ object FlinkEngineUtils extends Logging {
       }).toList
     } else null
   }
+
+  def renewDelegationToken(delegationToken: String): Unit = {
+    val newCreds = KyuubiHadoopUtils.decodeCredentials(delegationToken)
+    val newTokens = KyuubiHadoopUtils.getTokenMap(newCreds)
+
+    val updateCreds = new Credentials()
+    val oldCreds = UserGroupInformation.getCurrentUser.getCredentials
+    newTokens.foreach { case (alias, newToken) =>
+      val oldToken = oldCreds.getToken(alias)
+      if (oldToken != null) {
+        if (KyuubiHadoopUtils.compareIssueDate(newToken, oldToken) > 0) {
+          updateCreds.addToken(alias, newToken)
+        } else {
+          warn(s"Ignore token with earlier issue date: $newToken")
+        }
+      } else {
+        info(s"Add new unknown token $newToken")
+        updateCreds.addToken(alias, newToken)
+      }
+    }
+
+    if (updateCreds.numberOfTokens() > 0) {
+      info("Update delegation tokens. " +
+        s"The number of tokens sent by the server is 
${newCreds.numberOfTokens()}. " +
+        s"The actual number of updated tokens is 
${updateCreds.numberOfTokens()}.")
+      UserGroupInformation.getCurrentUser.addCredentials(updateCreds)
+    }
+  }
 }
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
index 83149735fb..bc772ced8f 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
@@ -102,7 +102,7 @@ object FlinkSQLEngine extends Logging {
       val engineCredentials = 
kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
       kyuubiConf.unset(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
       engineCredentials.filter(_.nonEmpty).foreach { credentials =>
-        FlinkTBinaryFrontendService.renewDelegationToken(credentials)
+        FlinkEngineUtils.renewDelegationToken(credentials)
       }
 
       val engineContext = FlinkEngineUtils.getDefaultContext(args, flinkConf, 
flinkConfDir)
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkTBinaryFrontendService.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkTBinaryFrontendService.scala
index 79a6dff967..f2161599e4 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkTBinaryFrontendService.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkTBinaryFrontendService.scala
@@ -17,15 +17,12 @@
 
 package org.apache.kyuubi.engine.flink
 
-import org.apache.hadoop.security.{Credentials, UserGroupInformation}
-
-import org.apache.kyuubi.{KyuubiSQLException, Logging}
-import 
org.apache.kyuubi.engine.flink.FlinkTBinaryFrontendService.renewDelegationToken
+import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.engine.flink.FlinkEngineUtils.renewDelegationToken
 import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, ServiceDiscovery}
 import org.apache.kyuubi.service.{Serverable, Service, TBinaryFrontendService}
 import org.apache.kyuubi.service.TFrontendService.OK_STATUS
 import 
org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TRenewDelegationTokenReq, 
TRenewDelegationTokenResp}
-import org.apache.kyuubi.util.KyuubiHadoopUtils
 
 class FlinkTBinaryFrontendService(
     override val serverable: Serverable)
@@ -56,33 +53,3 @@ class FlinkTBinaryFrontendService(
   }
 
 }
-
-object FlinkTBinaryFrontendService extends Logging {
-  private[flink] def renewDelegationToken(delegationToken: String): Unit = {
-    val newCreds = KyuubiHadoopUtils.decodeCredentials(delegationToken)
-    val newTokens = KyuubiHadoopUtils.getTokenMap(newCreds)
-
-    val updateCreds = new Credentials()
-    val oldCreds = UserGroupInformation.getCurrentUser.getCredentials
-    newTokens.foreach { case (alias, newToken) =>
-      val oldToken = oldCreds.getToken(alias)
-      if (oldToken != null) {
-        if (KyuubiHadoopUtils.compareIssueDate(newToken, oldToken) > 0) {
-          updateCreds.addToken(alias, newToken)
-        } else {
-          warn(s"Ignore token with earlier issue date: $newToken")
-        }
-      } else {
-        info(s"Add new unknown token $newToken")
-        updateCreds.addToken(alias, newToken)
-      }
-    }
-
-    if (updateCreds.numberOfTokens() > 0) {
-      info("Update delegation tokens. " +
-        s"The number of tokens sent by the server is 
${newCreds.numberOfTokens()}. " +
-        s"The actual number of updated tokens is 
${updateCreds.numberOfTokens()}.")
-      UserGroupInformation.getCurrentUser.addCredentials(updateCreds)
-    }
-  }
-}
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 901703924e..436ac5fd16 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -3017,6 +3017,26 @@ object KyuubiConf {
       .version("1.8.1")
       .fallbackConf(ENGINE_INITIALIZE_SQL)
 
+  val ENGINE_FLINK_DOAS_ENABLED: ConfigEntry[Boolean] =
+    buildConf("kyuubi.engine.flink.doAs.enabled")
+      .doc("When enabled, the session user is used as the proxy user to launch 
the Flink engine," +
+        " otherwise, the server user. Note, due to the limitation of Apache 
Flink," +
+        " it can only be enabled on Kerberized environment.")
+      .version("1.10.0")
+      .booleanConf
+      .createWithDefault(false)
+
+  val ENGINE_FLINK_DOAS_GENERATE_TOKEN_FILE: ConfigEntry[Boolean] =
+    buildConf("kyuubi.engine.flink.doAs.generateTokenFile")
+      .internal
+      .doc(s"When ${ENGINE_FLINK_DOAS_ENABLED.key}=true and neither 
FLINK-35525 (Flink 1.20.0)" +
+        " nor YARN-10333 (Hadoop 3.4.0) is available, enable this 
configuration to generate a" +
+        " temporary HADOOP_TOKEN_FILE that will be picked up by the Flink 
engine bootstrap" +
+        " process.")
+      .version("1.10.0")
+      .booleanConf
+      .createWithDefault(false)
+
   val SERVER_LIMIT_CONNECTIONS_PER_USER: OptionalConfigEntry[Int] =
     buildConf("kyuubi.server.limit.connections.per.user")
       .doc("Maximum kyuubi server connections per user." +
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
index 9597c974f9..9f6e7e68c1 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
@@ -24,14 +24,18 @@ import scala.collection.mutable
 
 import com.google.common.annotations.VisibleForTesting
 import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.kyuubi._
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
-import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
+import 
org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_CREDENTIALS_KEY, 
KYUUBI_SESSION_USER_KEY}
 import org.apache.kyuubi.engine.{ApplicationManagerInfo, 
KyuubiApplicationManager, ProcBuilder}
 import org.apache.kyuubi.engine.flink.FlinkProcessBuilder._
 import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.util.KyuubiHadoopUtils
 import org.apache.kyuubi.util.command.CommandLineUtils._
 
 /**
@@ -59,14 +63,31 @@ class FlinkProcessBuilder(
   // flink.execution.target are required in Kyuubi conf currently
   val executionTarget: Option[String] = 
conf.getOption("flink.execution.target")
 
+  private lazy val proxyUserEnable: Boolean = {
+    var flinkDoAsEnabled = conf.get(ENGINE_FLINK_DOAS_ENABLED)
+    if (flinkDoAsEnabled && !UserGroupInformation.isSecurityEnabled) {
+      warn(s"${ENGINE_FLINK_DOAS_ENABLED.key} can only be enabled on 
Kerberized environment.")
+      flinkDoAsEnabled = false
+    }
+    flinkDoAsEnabled
+  }
+
   override protected def module: String = "kyuubi-flink-sql-engine"
 
   override protected def mainClass: String = 
"org.apache.kyuubi.engine.flink.FlinkSQLEngine"
 
-  override def env: Map[String, String] = conf.getEnvs +
-    ("FLINK_CONF_DIR" -> conf.getEnvs.getOrElse(
-      "FLINK_CONF_DIR",
-      s"$flinkHome${File.separator}conf"))
+  override def env: Map[String, String] = {
+    val flinkConfDir =
+      conf.getEnvs.getOrElse("FLINK_CONF_DIR", 
s"$flinkHome${File.separator}conf")
+    val flinkExtraEnvs = if (proxyUserEnable) {
+      Map(
+        "FLINK_CONF_DIR" -> flinkConfDir,
+        FLINK_PROXY_USER_KEY -> proxyUser) ++ generateTokenFile()
+    } else {
+      Map("FLINK_CONF_DIR" -> flinkConfDir)
+    }
+    conf.getEnvs ++ flinkExtraEnvs
+  }
 
   override def clusterManager(): Option[String] = {
     executionTarget match {
@@ -114,7 +135,29 @@ class FlinkProcessBuilder(
           flinkExtraJars += s"$hiveConfFile"
         }
 
-        val customFlinkConf = conf.getAllWithPrefix(FLINK_CONF_PREFIX, "")
+        val externalProxyUserConf: Map[String, String] = if (proxyUserEnable) {
+          // FLINK-31109 (1.17.0): Flink only supports hadoop proxy user when 
delegation tokens
+          // fetch is managed outside, but disabling 
`security.delegation.tokens.enabled` will cause
+          // delegation token updates on JobManager not to be passed to 
TaskManagers.
+          // Based on the solution in
+          // 
https://github.com/apache/flink/pull/22009#issuecomment-2122226755, we removed
+          // `HadoopModuleFactory` from `security.module.factory.classes` and 
disabled delegation
+          // token providers (hadoopfs/hbase/HiveServer2) that do not support 
proxyUser.
+          // FLINK-35525: We need to add 
`yarn.security.appmaster.delegation.token.services=kyuubi`
+          // configuration to pass hdfs token obtained by kyuubi provider to 
the yarn client.
+          Map(
+            "security.module.factory.classes" ->
+              ("org.apache.flink.runtime.security.modules.JaasModuleFactory;" +
+                
"org.apache.flink.runtime.security.modules.ZookeeperModuleFactory"),
+            "security.delegation.token.provider.hadoopfs.enabled" -> "false",
+            "security.delegation.token.provider.hbase.enabled" -> "false",
+            "security.delegation.token.provider.HiveServer2.enabled" -> 
"false",
+            "yarn.security.appmaster.delegation.token.services" -> "kyuubi")
+        } else {
+          Map.empty
+        }
+
+        val customFlinkConf = conf.getAllWithPrefix(FLINK_CONF_PREFIX, "") ++ 
externalProxyUserConf
         // add custom yarn.ship-files
         flinkExtraJars ++= customFlinkConf.get(YARN_SHIP_FILES_KEY)
         val yarnAppName = customFlinkConf.get(YARN_APPLICATION_NAME_KEY)
@@ -214,6 +257,37 @@ class FlinkProcessBuilder(
     }
   }
 
+  @volatile private var tokenTempDir: java.nio.file.Path = _
+  private def generateTokenFile(): Option[(String, String)] = {
+    if (conf.get(ENGINE_FLINK_DOAS_GENERATE_TOKEN_FILE)) {
+      // We disabled `hadoopfs` token service, which may cause yarn client to 
miss hdfs token.
+      // So we generate a hadoop token file to pass kyuubi engine tokens to 
submit process.
+      // TODO: Removed this after FLINK-35525 (1.20.0), delegation tokens will 
be passed
+      //  by `kyuubi` provider
+      conf.getOption(KYUUBI_ENGINE_CREDENTIALS_KEY).map { encodedCredentials =>
+        val credentials = 
KyuubiHadoopUtils.decodeCredentials(encodedCredentials)
+        tokenTempDir = Utils.createTempDir()
+        val file = 
s"${tokenTempDir.toString}/kyuubi_credentials_${System.currentTimeMillis()}"
+        credentials.writeTokenStorageFile(new Path(s"file://$file"), new 
Configuration())
+        info(s"Generated hadoop token file: $file")
+        "HADOOP_TOKEN_FILE_LOCATION" -> file
+      }
+    } else {
+      None
+    }
+  }
+
+  override def close(destroyProcess: Boolean): Unit = {
+    super.close(destroyProcess)
+    if (tokenTempDir != null) {
+      try {
+        Utils.deleteDirectoryRecursively(tokenTempDir.toFile)
+      } catch {
+        case e: Throwable => error(s"Error deleting token temp dir: 
$tokenTempDir", e)
+      }
+    }
+  }
+
   override def shortName: String = "flink"
 }
 
@@ -227,4 +301,6 @@ object FlinkProcessBuilder {
 
   final val FLINK_HADOOP_CLASSPATH_KEY = "FLINK_HADOOP_CLASSPATH"
   final val FLINK_PROXY_USER_KEY = "HADOOP_PROXY_USER"
+  final val FLINK_SECURITY_KEYTAB_KEY = "security.kerberos.login.keytab"
+  final val FLINK_SECURITY_PRINCIPAL_KEY = "security.kerberos.login.principal"
 }
diff --git a/pom.xml b/pom.xml
index b2eafa76a8..18eb69ef12 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,6 +54,7 @@
     <modules>
         <module>dev/kyuubi-codecov</module>
         <module>extensions/server/kyuubi-server-plugin</module>
+        <module>extensions/flink/kyuubi-flink-token-provider</module>
         <module>extensions/spark/kyuubi-extension-spark-jdbc-dialect</module>
         <module>extensions/spark/kyuubi-spark-authz</module>
         <module>extensions/spark/kyuubi-spark-authz-shaded</module>

Reply via email to