This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 4cbecdc12 [KYUUBI #6367] Flink SQL engine supports RenewDelegationToken
4cbecdc12 is described below
commit 4cbecdc12f1f930f5375dddf02c3e545716b3400
Author: wforget <[email protected]>
AuthorDate: Fri May 24 12:15:44 2024 +0800
[KYUUBI #6367] Flink SQL engine supports RenewDelegationToken
# :mag: Description
## Issue References ๐
This pull request fixes #6367
## Describe Your Solution ๐ง
+ Implement `RenewDelegationToken` method in `FlinkTBinaryFrontendService`.
+ Pass `kyuubi.engine.credentials` configuration when starting flink engine.
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [X] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
test connection:
```
"jdbc:hive2://hadoop-master1.orb.local:10009/default;hive.server2.proxy.user=spark;principal=kyuubi/_HOSTTEST.ORG?kyuubi.engine.type=FLINK_SQL;flink.execution.target=yarn-application"
```
flink engine builder command:

jobmanager log:
```
2024-05-22 07:46:46,545 INFO
org.apache.kyuubi.engine.flink.FlinkTBinaryFrontendService [] - Add new
unknown token Kind: HIVE_DELEGATION_TOKEN, Service: , Ident: 00 05 73 70 61 72
6b 04 68 69 76 65 28 6b 79 75 75 62 69 2f 68 61 64 6f 6f 70 2d 6d 61 73 74 65
72 31 2e 6f 72 62 2e 6c 6f 63 61 6c 40 54 45 53 54 2e 4f 52 47 8a 01 8f 9f 3f
d5 4c 8a 01 8f c3 4c 59 4c 0b 06
2024-05-22 07:46:46,547 WARN
org.apache.kyuubi.engine.flink.FlinkTBinaryFrontendService [] - Ignore token
with earlier issue date: Kind: HDFS_DELEGATION_TOKEN, Service: 172.20.0.5:8020,
Ident: (token for spark: HDFS_DELEGATION_TOKEN owner=spark, renewer=spark,
realUser=kyuubi/hadoop-master1.orb.localTEST.ORG, issueDate=1716363711750,
maxDate=1716968511750, sequenceNumber=15, masterKeyId=7)
2024-05-22 07:46:46,548 INFO
org.apache.kyuubi.engine.flink.FlinkTBinaryFrontendService [] - Update
delegation tokens. The number of tokens sent by the server is 2. The actual
number of updated tokens is 1.
```
#### Related Unit Tests
---
# Checklist ๐
- [X] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6371 from wForget/KYUUBI-6367.
Closes #6367
83b402aa0 [wforget] Revert "change Base64 encoder/decoder"
f5c08eb45 [wforget] change Base64 encoder/decoder
e8c66dfc5 [wforget] fix test
e59820b3e [wforget] [KYUUBI #6367] Support RenewDelegationToken for flink
sql engine
Authored-by: wforget <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../kyuubi/engine/flink/FlinkSQLEngine.scala | 8 +++-
.../engine/flink/FlinkTBinaryFrontendService.scala | 53 ++++++++++++++++++++++
.../engine/spark/SparkTBinaryFrontendService.scala | 16 +------
.../org/apache/kyuubi/util/KyuubiHadoopUtils.scala | 12 +++++
.../kyuubi/engine/flink/FlinkProcessBuilder.scala | 4 +-
.../engine/flink/FlinkProcessBuilderSuite.scala | 1 -
6 files changed, 75 insertions(+), 19 deletions(-)
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
index dff9aa602..85232688e 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
@@ -31,7 +31,7 @@ import
org.apache.flink.table.gateway.service.context.DefaultContext
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.Utils.{addShutdownHook, currentUser,
FLINK_ENGINE_SHUTDOWN_PRIORITY}
-import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf.ENGINE_FLINK_INITIALIZE_SQL
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_NAME,
KYUUBI_SESSION_USER_KEY}
import org.apache.kyuubi.engine.flink.FlinkSQLEngine.{countDownLatch,
currentEngine}
@@ -99,6 +99,12 @@ object FlinkSQLEngine extends Logging {
kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
+ val engineCredentials =
kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
+ kyuubiConf.unset(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
+ engineCredentials.filter(_.nonEmpty).foreach { credentials =>
+ FlinkTBinaryFrontendService.renewDelegationToken(credentials)
+ }
+
val engineContext = FlinkEngineUtils.getDefaultContext(args, flinkConf,
flinkConfDir)
startEngine(engineContext)
info("Flink engine started")
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkTBinaryFrontendService.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkTBinaryFrontendService.scala
index a709ea760..79a6dff96 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkTBinaryFrontendService.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkTBinaryFrontendService.scala
@@ -17,13 +17,36 @@
package org.apache.kyuubi.engine.flink
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+
+import org.apache.kyuubi.{KyuubiSQLException, Logging}
+import
org.apache.kyuubi.engine.flink.FlinkTBinaryFrontendService.renewDelegationToken
import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, ServiceDiscovery}
import org.apache.kyuubi.service.{Serverable, Service, TBinaryFrontendService}
+import org.apache.kyuubi.service.TFrontendService.OK_STATUS
+import
org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TRenewDelegationTokenReq,
TRenewDelegationTokenResp}
+import org.apache.kyuubi.util.KyuubiHadoopUtils
class FlinkTBinaryFrontendService(
override val serverable: Serverable)
extends TBinaryFrontendService("FlinkThriftBinaryFrontendService") {
+ override def RenewDelegationToken(req: TRenewDelegationTokenReq):
TRenewDelegationTokenResp = {
+ debug(req.toString)
+ // We hacked `TCLIService.Iface.RenewDelegationToken` to transfer
Credentials from Kyuubi
+ // Server to Flink SQL engine
+ val resp = new TRenewDelegationTokenResp()
+ try {
+ renewDelegationToken(req.getDelegationToken)
+ resp.setStatus(OK_STATUS)
+ } catch {
+ case e: Exception =>
+ warn("Error renew delegation tokens: ", e)
+ resp.setStatus(KyuubiSQLException.toTStatus(e))
+ }
+ resp
+ }
+
override lazy val discoveryService: Option[Service] = {
if (ServiceDiscovery.supportServiceDiscovery(conf)) {
Some(new EngineServiceDiscovery(this))
@@ -33,3 +56,33 @@ class FlinkTBinaryFrontendService(
}
}
+
+object FlinkTBinaryFrontendService extends Logging {
+ private[flink] def renewDelegationToken(delegationToken: String): Unit = {
+ val newCreds = KyuubiHadoopUtils.decodeCredentials(delegationToken)
+ val newTokens = KyuubiHadoopUtils.getTokenMap(newCreds)
+
+ val updateCreds = new Credentials()
+ val oldCreds = UserGroupInformation.getCurrentUser.getCredentials
+ newTokens.foreach { case (alias, newToken) =>
+ val oldToken = oldCreds.getToken(alias)
+ if (oldToken != null) {
+ if (KyuubiHadoopUtils.compareIssueDate(newToken, oldToken) > 0) {
+ updateCreds.addToken(alias, newToken)
+ } else {
+ warn(s"Ignore token with earlier issue date: $newToken")
+ }
+ } else {
+ info(s"Add new unknown token $newToken")
+ updateCreds.addToken(alias, newToken)
+ }
+ }
+
+ if (updateCreds.numberOfTokens() > 0) {
+ info("Update delegation tokens. " +
+ s"The number of tokens sent by the server is
${newCreds.numberOfTokens()}. " +
+ s"The actual number of updated tokens is
${updateCreds.numberOfTokens()}.")
+ UserGroupInformation.getCurrentUser.addCredentials(updateCreds)
+ }
+ }
+}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkTBinaryFrontendService.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkTBinaryFrontendService.scala
index 7ca2e8fbe..2eed5253d 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkTBinaryFrontendService.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkTBinaryFrontendService.scala
@@ -162,7 +162,7 @@ object SparkTBinaryFrontendService extends Logging {
}
.map(_._2)
newToken.foreach { token =>
- if (compareIssueDate(token, oldAliasAndToken.get._2) > 0) {
+ if (KyuubiHadoopUtils.compareIssueDate(token, oldAliasAndToken.get._2)
> 0) {
updateCreds.addToken(oldAliasAndToken.get._1, token)
} else {
warn(s"Ignore Hive token with earlier issue date: $token")
@@ -186,7 +186,7 @@ object SparkTBinaryFrontendService extends Logging {
tokens.foreach { case (alias, newToken) =>
val oldToken = oldCreds.getToken(alias)
if (oldToken != null) {
- if (compareIssueDate(newToken, oldToken) > 0) {
+ if (KyuubiHadoopUtils.compareIssueDate(newToken, oldToken) > 0) {
updateCreds.addToken(alias, newToken)
} else {
warn(s"Ignore token with earlier issue date: $newToken")
@@ -197,18 +197,6 @@ object SparkTBinaryFrontendService extends Logging {
}
}
- private def compareIssueDate(
- newToken: Token[_ <: TokenIdentifier],
- oldToken: Token[_ <: TokenIdentifier]): Int = {
- val newDate = KyuubiHadoopUtils.getTokenIssueDate(newToken)
- val oldDate = KyuubiHadoopUtils.getTokenIssueDate(oldToken)
- if (newDate.isDefined && oldDate.isDefined && newDate.get <= oldDate.get) {
- -1
- } else {
- 1
- }
- }
-
private[kyuubi] def hiveConf(hadoopConf: Configuration): Configuration = {
if (_hiveConf == null) {
synchronized {
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
index a3047d0f4..93135ba3e 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
@@ -116,6 +116,18 @@ object KyuubiHadoopUtils extends Logging {
}
}
+ def compareIssueDate(
+ newToken: Token[_ <: TokenIdentifier],
+ oldToken: Token[_ <: TokenIdentifier]): Int = {
+ val newDate = KyuubiHadoopUtils.getTokenIssueDate(newToken)
+ val oldDate = KyuubiHadoopUtils.getTokenIssueDate(oldToken)
+ if (newDate.isDefined && oldDate.isDefined && newDate.get <= oldDate.get) {
+ -1
+ } else {
+ 1
+ }
+ }
+
/**
* Add a path variable to the given environment map.
* If the map already contains this key, append the value to the existing
value instead.
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
index 241b7ec78..3fa7ea50a 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
@@ -25,7 +25,7 @@ import scala.collection.mutable
import com.google.common.annotations.VisibleForTesting
import org.apache.kyuubi._
-import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
+import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.{ApplicationManagerInfo,
KyuubiApplicationManager, ProcBuilder}
@@ -80,8 +80,6 @@ class FlinkProcessBuilder(
override protected val commands: Iterable[String] = {
KyuubiApplicationManager.tagApplication(engineRefId, shortName,
clusterManager(), conf)
- // unset engine credentials because Flink doesn't support them at the
moment
- conf.unset(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
// flink.execution.target are required in Kyuubi conf currently
executionTarget match {
case Some("yarn-application") =>
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
index 952f71c08..8786ef798 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
@@ -72,7 +72,6 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
(FLINK_HADOOP_CLASSPATH_KEY -> s"${File.separator}hadoop")
private def confStr: String = {
sessionModeConf.clone.getAll
- .filter(!_._1.equals(KYUUBI_ENGINE_CREDENTIALS_KEY))
.map { case (k, v) => s"\\\\\\n\\t--conf $k=$v" }
.mkString(" ")
}