This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4cbecdc12 [KYUUBI #6367] Flink SQL engine supports RenewDelegationToken
4cbecdc12 is described below

commit 4cbecdc12f1f930f5375dddf02c3e545716b3400
Author: wforget <[email protected]>
AuthorDate: Fri May 24 12:15:44 2024 +0800

    [KYUUBI #6367] Flink SQL engine supports RenewDelegationToken
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    This pull request fixes #6367
    
    ## Describe Your Solution ๐Ÿ”ง
    
    + Implement `RenewDelegationToken` method in `FlinkTBinaryFrontendService`.
    + Pass `kyuubi.engine.credentials` configuration when starting flink engine.
    
    ## Types of changes :bookmark:
    
    - [ ] Bugfix (non-breaking change which fixes an issue)
    - [X] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    test connection:
    
    ```
    
"jdbc:hive2://hadoop-master1.orb.local:10009/default;hive.server2.proxy.user=spark;principal=kyuubi/_HOSTTEST.ORG?kyuubi.engine.type=FLINK_SQL;flink.execution.target=yarn-application"
    ```
    
    flink engine builder command:
    
    
![image](https://github.com/apache/kyuubi/assets/17894939/dcdb8466-c423-464d-8119-9c4236f17ce7)
    
    jobmanager log:
    
    ```
    2024-05-22 07:46:46,545 INFO  
org.apache.kyuubi.engine.flink.FlinkTBinaryFrontendService   [] - Add new 
unknown token Kind: HIVE_DELEGATION_TOKEN, Service: , Ident: 00 05 73 70 61 72 
6b 04 68 69 76 65 28 6b 79 75 75 62 69 2f 68 61 64 6f 6f 70 2d 6d 61 73 74 65 
72 31 2e 6f 72 62 2e 6c 6f 63 61 6c 40 54 45 53 54 2e 4f 52 47 8a 01 8f 9f 3f 
d5 4c 8a 01 8f c3 4c 59 4c 0b 06
    2024-05-22 07:46:46,547 WARN  
org.apache.kyuubi.engine.flink.FlinkTBinaryFrontendService   [] - Ignore token 
with earlier issue date: Kind: HDFS_DELEGATION_TOKEN, Service: 172.20.0.5:8020, 
Ident: (token for spark: HDFS_DELEGATION_TOKEN owner=spark, renewer=spark, 
realUser=kyuubi/hadoop-master1.orb.localTEST.ORG, issueDate=1716363711750, 
maxDate=1716968511750, sequenceNumber=15, masterKeyId=7)
    2024-05-22 07:46:46,548 INFO  
org.apache.kyuubi.engine.flink.FlinkTBinaryFrontendService   [] - Update 
delegation tokens. The number of tokens sent by the server is 2. The actual 
number of updated tokens is 1.
    
    ```
    
    #### Related Unit Tests
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [X] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6371 from wForget/KYUUBI-6367.
    
    Closes #6367
    
    83b402aa0 [wforget] Revert "change Base64 encoder/decoder"
    f5c08eb45 [wforget] change Base64 encoder/decoder
    e8c66dfc5 [wforget] fix test
    e59820b3e [wforget] [KYUUBI #6367] Support RenewDelegationToken for flink 
sql engine
    
    Authored-by: wforget <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../kyuubi/engine/flink/FlinkSQLEngine.scala       |  8 +++-
 .../engine/flink/FlinkTBinaryFrontendService.scala | 53 ++++++++++++++++++++++
 .../engine/spark/SparkTBinaryFrontendService.scala | 16 +------
 .../org/apache/kyuubi/util/KyuubiHadoopUtils.scala | 12 +++++
 .../kyuubi/engine/flink/FlinkProcessBuilder.scala  |  4 +-
 .../engine/flink/FlinkProcessBuilderSuite.scala    |  1 -
 6 files changed, 75 insertions(+), 19 deletions(-)

diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
index dff9aa602..85232688e 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
@@ -31,7 +31,7 @@ import 
org.apache.flink.table.gateway.service.context.DefaultContext
 
 import org.apache.kyuubi.{Logging, Utils}
 import org.apache.kyuubi.Utils.{addShutdownHook, currentUser, 
FLINK_ENGINE_SHUTDOWN_PRIORITY}
-import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
 import org.apache.kyuubi.config.KyuubiConf.ENGINE_FLINK_INITIALIZE_SQL
 import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_NAME, 
KYUUBI_SESSION_USER_KEY}
 import org.apache.kyuubi.engine.flink.FlinkSQLEngine.{countDownLatch, 
currentEngine}
@@ -99,6 +99,12 @@ object FlinkSQLEngine extends Logging {
 
       kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
 
+      val engineCredentials = 
kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
+      kyuubiConf.unset(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
+      engineCredentials.filter(_.nonEmpty).foreach { credentials =>
+        FlinkTBinaryFrontendService.renewDelegationToken(credentials)
+      }
+
       val engineContext = FlinkEngineUtils.getDefaultContext(args, flinkConf, 
flinkConfDir)
       startEngine(engineContext)
       info("Flink engine started")
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkTBinaryFrontendService.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkTBinaryFrontendService.scala
index a709ea760..79a6dff96 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkTBinaryFrontendService.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkTBinaryFrontendService.scala
@@ -17,13 +17,36 @@
 
 package org.apache.kyuubi.engine.flink
 
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+
+import org.apache.kyuubi.{KyuubiSQLException, Logging}
+import 
org.apache.kyuubi.engine.flink.FlinkTBinaryFrontendService.renewDelegationToken
 import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, ServiceDiscovery}
 import org.apache.kyuubi.service.{Serverable, Service, TBinaryFrontendService}
+import org.apache.kyuubi.service.TFrontendService.OK_STATUS
+import 
org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TRenewDelegationTokenReq, 
TRenewDelegationTokenResp}
+import org.apache.kyuubi.util.KyuubiHadoopUtils
 
 class FlinkTBinaryFrontendService(
     override val serverable: Serverable)
   extends TBinaryFrontendService("FlinkThriftBinaryFrontendService") {
 
+  override def RenewDelegationToken(req: TRenewDelegationTokenReq): 
TRenewDelegationTokenResp = {
+    debug(req.toString)
+    // We hacked `TCLIService.Iface.RenewDelegationToken` to transfer 
Credentials from Kyuubi
+    // Server to Flink SQL engine
+    val resp = new TRenewDelegationTokenResp()
+    try {
+      renewDelegationToken(req.getDelegationToken)
+      resp.setStatus(OK_STATUS)
+    } catch {
+      case e: Exception =>
+        warn("Error renew delegation tokens: ", e)
+        resp.setStatus(KyuubiSQLException.toTStatus(e))
+    }
+    resp
+  }
+
   override lazy val discoveryService: Option[Service] = {
     if (ServiceDiscovery.supportServiceDiscovery(conf)) {
       Some(new EngineServiceDiscovery(this))
@@ -33,3 +56,33 @@ class FlinkTBinaryFrontendService(
   }
 
 }
+
+object FlinkTBinaryFrontendService extends Logging {
+  private[flink] def renewDelegationToken(delegationToken: String): Unit = {
+    val newCreds = KyuubiHadoopUtils.decodeCredentials(delegationToken)
+    val newTokens = KyuubiHadoopUtils.getTokenMap(newCreds)
+
+    val updateCreds = new Credentials()
+    val oldCreds = UserGroupInformation.getCurrentUser.getCredentials
+    newTokens.foreach { case (alias, newToken) =>
+      val oldToken = oldCreds.getToken(alias)
+      if (oldToken != null) {
+        if (KyuubiHadoopUtils.compareIssueDate(newToken, oldToken) > 0) {
+          updateCreds.addToken(alias, newToken)
+        } else {
+          warn(s"Ignore token with earlier issue date: $newToken")
+        }
+      } else {
+        info(s"Add new unknown token $newToken")
+        updateCreds.addToken(alias, newToken)
+      }
+    }
+
+    if (updateCreds.numberOfTokens() > 0) {
+      info("Update delegation tokens. " +
+        s"The number of tokens sent by the server is 
${newCreds.numberOfTokens()}. " +
+        s"The actual number of updated tokens is 
${updateCreds.numberOfTokens()}.")
+      UserGroupInformation.getCurrentUser.addCredentials(updateCreds)
+    }
+  }
+}
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkTBinaryFrontendService.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkTBinaryFrontendService.scala
index 7ca2e8fbe..2eed5253d 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkTBinaryFrontendService.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkTBinaryFrontendService.scala
@@ -162,7 +162,7 @@ object SparkTBinaryFrontendService extends Logging {
         }
         .map(_._2)
       newToken.foreach { token =>
-        if (compareIssueDate(token, oldAliasAndToken.get._2) > 0) {
+        if (KyuubiHadoopUtils.compareIssueDate(token, oldAliasAndToken.get._2) 
> 0) {
           updateCreds.addToken(oldAliasAndToken.get._1, token)
         } else {
           warn(s"Ignore Hive token with earlier issue date: $token")
@@ -186,7 +186,7 @@ object SparkTBinaryFrontendService extends Logging {
     tokens.foreach { case (alias, newToken) =>
       val oldToken = oldCreds.getToken(alias)
       if (oldToken != null) {
-        if (compareIssueDate(newToken, oldToken) > 0) {
+        if (KyuubiHadoopUtils.compareIssueDate(newToken, oldToken) > 0) {
           updateCreds.addToken(alias, newToken)
         } else {
           warn(s"Ignore token with earlier issue date: $newToken")
@@ -197,18 +197,6 @@ object SparkTBinaryFrontendService extends Logging {
     }
   }
 
-  private def compareIssueDate(
-      newToken: Token[_ <: TokenIdentifier],
-      oldToken: Token[_ <: TokenIdentifier]): Int = {
-    val newDate = KyuubiHadoopUtils.getTokenIssueDate(newToken)
-    val oldDate = KyuubiHadoopUtils.getTokenIssueDate(oldToken)
-    if (newDate.isDefined && oldDate.isDefined && newDate.get <= oldDate.get) {
-      -1
-    } else {
-      1
-    }
-  }
-
   private[kyuubi] def hiveConf(hadoopConf: Configuration): Configuration = {
     if (_hiveConf == null) {
       synchronized {
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
index a3047d0f4..93135ba3e 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
@@ -116,6 +116,18 @@ object KyuubiHadoopUtils extends Logging {
     }
   }
 
+  def compareIssueDate(
+      newToken: Token[_ <: TokenIdentifier],
+      oldToken: Token[_ <: TokenIdentifier]): Int = {
+    val newDate = KyuubiHadoopUtils.getTokenIssueDate(newToken)
+    val oldDate = KyuubiHadoopUtils.getTokenIssueDate(oldToken)
+    if (newDate.isDefined && oldDate.isDefined && newDate.get <= oldDate.get) {
+      -1
+    } else {
+      1
+    }
+  }
+
   /**
    * Add a path variable to the given environment map.
    * If the map already contains this key, append the value to the existing 
value instead.
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
index 241b7ec78..3fa7ea50a 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
@@ -25,7 +25,7 @@ import scala.collection.mutable
 import com.google.common.annotations.VisibleForTesting
 
 import org.apache.kyuubi._
-import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
+import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
 import org.apache.kyuubi.engine.{ApplicationManagerInfo, 
KyuubiApplicationManager, ProcBuilder}
@@ -80,8 +80,6 @@ class FlinkProcessBuilder(
 
   override protected val commands: Iterable[String] = {
     KyuubiApplicationManager.tagApplication(engineRefId, shortName, 
clusterManager(), conf)
-    // unset engine credentials because Flink doesn't support them at the 
moment
-    conf.unset(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
     // flink.execution.target are required in Kyuubi conf currently
     executionTarget match {
       case Some("yarn-application") =>
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
index 952f71c08..8786ef798 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
@@ -72,7 +72,6 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
     (FLINK_HADOOP_CLASSPATH_KEY -> s"${File.separator}hadoop")
   private def confStr: String = {
     sessionModeConf.clone.getAll
-      .filter(!_._1.equals(KYUUBI_ENGINE_CREDENTIALS_KEY))
       .map { case (k, v) => s"\\\\\\n\\t--conf $k=$v" }
       .mkString(" ")
   }

Reply via email to