This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 1b89272 [KYUUBI #1009] Implement Hive metastore server Delegation
Token Provider
1b89272 is described below
commit 1b8927274999ef5519309032873b43d3b9c11569
Author: zhouyifan279 <[email protected]>
AuthorDate: Thu Sep 9 19:19:58 2021 +0800
[KYUUBI #1009] Implement Hive metastore server Delegation Token Provider
### _Why are the changes needed?_
This PR finishes the work of issue #1009
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1053 from zhouyifan279/KYUUBI#1009.
Closes #1009
342fca91 [zhouyifan279] [KYUUBI #1009] Implement Hive metastore server
Delegation Token Provider
a9e1b34d [zhouyifan279] [KYUUBI #1009] Implement Hive metastore server
Delegation Token Provider
82da9007 [zhouyifan279] [KYUUBI #1009] Implement Hive metastore server
Delegation Token Provider
2c00dd3a [zhouyifan279] [KYUUBI #1009] Implement Hive metastore server
Delegation Token Provider
Authored-by: zhouyifan279 <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
---
dev/dependencyList | 4 +
docs/deployment/settings.md | 1 +
.../org/apache/kyuubi/config/KyuubiConf.scala | 7 +
kyuubi-server/pom.xml | 72 ++++++-
...g.apache.hadoop.security.token.TokenIdentifier} | 2 +-
...yuubi.credentials.HadoopDelegationTokenProvider | 1 +
.../credentials/HadoopCredentialsManager.scala | 7 +-
.../credentials/HadoopDelegationProvider.scala | 28 +--
.../HadoopFsDelegationTokenProvider.scala | 26 +--
.../credentials/HiveDelegationTokenProvider.scala | 72 +++++++
.../HadoopCredentialsManagerSuite.scala | 37 ++--
.../HadoopFsDelegationTokenProviderSuite.scala | 13 +-
.../HiveDelegationTokenProviderSuite.scala | 208 +++++++++++++++++++++
pom.xml | 101 ++++++++++
14 files changed, 516 insertions(+), 63 deletions(-)
diff --git a/dev/dependencyList b/dev/dependencyList
index ebe2659..c2b27a0 100644
--- a/dev/dependencyList
+++ b/dev/dependencyList
@@ -17,6 +17,7 @@
aopalliance-repackaged/2.6.1//aopalliance-repackaged-2.6.1.jar
commons-codec/1.15//commons-codec-1.15.jar
+commons-lang/2.6//commons-lang-2.6.jar
commons-lang3/3.10//commons-lang3-3.10.jar
curator-client/2.12.0//curator-client-2.12.0.jar
curator-framework/2.12.0//curator-framework-2.12.0.jar
@@ -25,7 +26,10 @@ failureaccess/1.0.1//failureaccess-1.0.1.jar
guava/30.1-jre//guava-30.1-jre.jar
hadoop-client-api/3.2.2//hadoop-client-api-3.2.2.jar
hadoop-client-runtime/3.2.2//hadoop-client-runtime-3.2.2.jar
+hive-common/2.3.7//hive-common-2.3.7.jar
+hive-metastore/2.3.7//hive-metastore-2.3.7.jar
hive-service-rpc/2.3.7//hive-service-rpc-2.3.7.jar
+hive-shims-common/2.3.7//hive-shims-common-2.3.7.jar
hk2-api/2.6.1//hk2-api-2.6.1.jar
hk2-locator/2.6.1//hk2-locator-2.6.1.jar
hk2-utils/2.6.1//hk2-utils-2.6.1.jar
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index df2b936..a64c07b 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -153,6 +153,7 @@ Key | Default | Meaning | Type | Since
--- | --- | --- | --- | ---
kyuubi\.credentials<br>\.hadoopfs\.enabled|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>Whether to renew HadoopFS
DelegationToken</div>|<div style='width: 30pt'>boolean</div>|<div style='width:
20pt'>1.4.0</div>
kyuubi\.credentials<br>\.hadoopfs\.uris|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'></div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>Extra Hadoop filesystem URIs for which to
request delegation tokens. The filesystem that hosts fs.defaultFS does not need
to be listed here.</div>|<div style='width: 30pt'>seq</div>|<div style='width:
20pt'>1.4.0</div>
+kyuubi\.credentials<br>\.hive\.enabled|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>Whether to renew HiveMetaStore
DelegationToken</div>|<div style='width: 30pt'>boolean</div>|<div style='width:
20pt'>1.4.0</div>
kyuubi\.credentials<br>\.renewal\.interval|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>PT1H</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>How often Kyuubi renews one user's
DelegationTokens</div>|<div style='width: 30pt'>duration</div>|<div
style='width: 20pt'>1.4.0</div>
kyuubi\.credentials<br>\.renewal\.retryWait|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>PT1M</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>How long to wait before retrying to fetch new
credentials after a failure.</div>|<div style='width: 30pt'>duration</div>|<div
style='width: 20pt'>1.4.0</div>
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index ce748f4..05b8aa6 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -265,6 +265,13 @@ object KyuubiConf {
.toSequence()
.createWithDefault(Nil)
+ val CREDENTIALS_HIVE_ENABLED: ConfigEntry[Boolean] =
+ buildConf("credentials.hive.enabled")
+ .doc("Whether to renew HiveMetaStore DelegationToken")
+ .version("1.4.0")
+ .booleanConf
+ .createWithDefault(true)
+
/////////////////////////////////////////////////////////////////////////////////////////////////
// Frontend Service Configuration
//
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml
index dbc1aec..8b18244 100644
--- a/kyuubi-server/pom.xml
+++ b/kyuubi-server/pom.xml
@@ -63,7 +63,53 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
- <scope>runtime</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-metastore</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive.shims</groupId>
+ <artifactId>hive-shims-common</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-common</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>jline</groupId>
+ <artifactId>jline</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.tdunning</groupId>
+ <artifactId>json</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.github.joshelser</groupId>
+
<artifactId>dropwizard-metrics-hadoop-metrics2-reporter</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
@@ -111,11 +157,6 @@
<dependency>
<groupId>org.apache.hive</groupId>
- <artifactId>hive-common</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<scope>test</scope>
</dependency>
@@ -136,6 +177,25 @@
</dependency>
<dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-metastore</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
<scope>test</scope>
diff --git
a/kyuubi-server/src/main/resources/META-INF/services/org.apache.kyuubi.credentials.HadoopDelegationTokenProvider
b/kyuubi-server/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
similarity index 92%
copy from
kyuubi-server/src/main/resources/META-INF/services/org.apache.kyuubi.credentials.HadoopDelegationTokenProvider
copy to
kyuubi-server/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
index 1dd41e9..9f2b2c5 100644
---
a/kyuubi-server/src/main/resources/META-INF/services/org.apache.kyuubi.credentials.HadoopDelegationTokenProvider
+++
b/kyuubi-server/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider
+org.apache.hadoop.hive.thrift.DelegationTokenIdentifier
diff --git
a/kyuubi-server/src/main/resources/META-INF/services/org.apache.kyuubi.credentials.HadoopDelegationTokenProvider
b/kyuubi-server/src/main/resources/META-INF/services/org.apache.kyuubi.credentials.HadoopDelegationTokenProvider
index 1dd41e9..1d931c8 100644
---
a/kyuubi-server/src/main/resources/META-INF/services/org.apache.kyuubi.credentials.HadoopDelegationTokenProvider
+++
b/kyuubi-server/src/main/resources/META-INF/services/org.apache.kyuubi.credentials.HadoopDelegationTokenProvider
@@ -16,3 +16,4 @@
#
org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider
+org.apache.kyuubi.credentials.HiveDelegationTokenProvider
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
index 253778d..5d01790 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
@@ -92,10 +92,12 @@ class HadoopCredentialsManager private (name: String)
extends AbstractService(na
hadoopConf = KyuubiHadoopUtils.newHadoopConf(conf)
providers = HadoopCredentialsManager.loadProviders(conf)
.filter { case (_, provider) =>
- val required = provider.delegationTokensRequired(hadoopConf, conf)
+ provider.initialize(hadoopConf, conf)
+ val required = provider.delegationTokensRequired()
if (!required) {
warn(s"Service ${provider.serviceName} does not require a token." +
s" Check your configuration to see if security is disabled or
not.")
+ provider.close()
}
required
}
@@ -121,6 +123,7 @@ class HadoopCredentialsManager private (name: String)
extends AbstractService(na
}
override def stop(): Unit = {
+ providers.values.foreach(_.close())
renewalExecutor.foreach { executor =>
executor.shutdownNow()
try {
@@ -200,7 +203,7 @@ class HadoopCredentialsManager private (name: String)
extends AbstractService(na
try {
val creds = new Credentials()
providers.values
- .foreach(_.obtainDelegationTokens(hadoopConf, conf,
userRef.getAppUser, creds))
+ .foreach(_.obtainDelegationTokens(userRef.getAppUser, creds))
userRef.updateCredentials(creds)
scheduleRenewal(userRef, renewalInterval)
} catch {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopDelegationProvider.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopDelegationProvider.scala
index cba5761..519ae78 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopDelegationProvider.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopDelegationProvider.scala
@@ -20,10 +20,9 @@ package org.apache.kyuubi.credentials
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials
-import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
-trait HadoopDelegationTokenProvider extends Logging {
+trait HadoopDelegationTokenProvider {
/**
* Name of the service to provide delegation tokens. This name should be
unique. Kyuubi will
@@ -32,22 +31,27 @@ trait HadoopDelegationTokenProvider extends Logging {
def serviceName: String
/**
+ * Initialize with provided hadoop and kyuubi conf
+ * @param hadoopConf Configuration of current Hadoop Compatible system.
+ */
+ def initialize(hadoopConf: Configuration, kyuubiConf: KyuubiConf): Unit
+
+ /**
* Returns true if delegation tokens are required for this service. By
default, it is based on
* whether Hadoop security is enabled.
*/
- def delegationTokensRequired(hadoopConf: Configuration, kyuubiConf:
KyuubiConf): Boolean
+ def delegationTokensRequired(): Boolean
/**
* Obtain delegation tokens for this service.
- *
- * @param hadoopConf Configuration of current Hadoop Compatible system.
- * @param owner DelegationToken owner.
- * @param creds Credentials to add tokens and security keys to.
+ * @param owner DelegationToken owner.
+ * @param creds Credentials to add tokens and security keys to.
+ */
+ def obtainDelegationTokens(owner: String, creds: Credentials): Unit
+
+ /**
+ * Close underlying resources if any
*/
- def obtainDelegationTokens(
- hadoopConf: Configuration,
- kyuubiConf: KyuubiConf,
- owner: String,
- creds: Credentials): Unit
+ def close(): Unit = {}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProvider.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProvider.scala
index 8bf0837..148b084 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProvider.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProvider.scala
@@ -33,28 +33,30 @@ import
org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider.{disableFsC
class HadoopFsDelegationTokenProvider extends HadoopDelegationTokenProvider
with Logging {
+ private var tokenRequired: Boolean = _
+ private var hadoopConf: Configuration = _
+ private var kyuubiConf: KyuubiConf = _
+
override val serviceName: String = "hadoopfs"
- override def delegationTokensRequired(
- hadoopConf: Configuration,
- kyuubiConf: KyuubiConf): Boolean = {
- SecurityUtil.getAuthenticationMethod(hadoopConf) !=
AuthenticationMethod.SIMPLE
- }
+ override def initialize(hadoopConf: Configuration, kyuubiConf: KyuubiConf):
Unit = {
+ this.tokenRequired =
+ SecurityUtil.getAuthenticationMethod(hadoopConf) !=
AuthenticationMethod.SIMPLE
- override def obtainDelegationTokens(
- hadoopConf: Configuration,
- kyuubiConf: KyuubiConf,
- owner: String,
- creds: Credentials): Unit = {
// FileSystem objects are cached in FileSystem.CACHE by a composite key.
// The UserGroupInformation object used to create it is part of that key.
// If cache is enabled, new FileSystem objects are created and cached at
every method
// invocation.
- val internalConf = disableFsCache(kyuubiConf, hadoopConf)
+ this.hadoopConf = disableFsCache(kyuubiConf, hadoopConf)
+ this.kyuubiConf = kyuubiConf
+ }
+
+ override def delegationTokensRequired(): Boolean = tokenRequired
+ override def obtainDelegationTokens(owner: String, creds: Credentials): Unit
= {
doAsProxyUser(owner) {
val fileSystems =
- HadoopFsDelegationTokenProvider.hadoopFSsToAccess(kyuubiConf,
internalConf)
+ HadoopFsDelegationTokenProvider.hadoopFSsToAccess(kyuubiConf,
hadoopConf)
try {
// Renewer is not needed. But setting a renewer can avoid potential
NPE.
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HiveDelegationTokenProvider.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HiveDelegationTokenProvider.scala
new file mode 100644
index 0000000..e1236ff
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HiveDelegationTokenProvider.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.credentials
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.metastore.{IMetaStoreClient,
RetryingMetaStoreClient}
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.{Credentials, SecurityUtil}
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
+import org.apache.hadoop.security.token.Token
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.config.KyuubiConf
+
+class HiveDelegationTokenProvider extends HadoopDelegationTokenProvider with
Logging {
+
+ private var client: Option[IMetaStoreClient] = None
+ private var principal: String = _
+
+ override def serviceName: String = "hive"
+
+ override def initialize(hadoopConf: Configuration, kyuubiConf: KyuubiConf):
Unit = {
+ val conf = new HiveConf(hadoopConf, classOf[HiveConf])
+ val metastoreUris = conf.getTrimmed("hive.metastore.uris", "")
+
+ if (SecurityUtil.getAuthenticationMethod(hadoopConf) !=
AuthenticationMethod.SIMPLE
+ && metastoreUris.nonEmpty
+ && conf.getBoolean("hive.metastore.sasl.enabled", false)) {
+
+ val principalKey = "hive.metastore.kerberos.principal"
+ principal = conf.getTrimmed(principalKey, "")
+ require(principal.nonEmpty, s"Hive principal $principalKey undefined")
+
+ client = Some(RetryingMetaStoreClient.getProxy(conf, false))
+ info(s"Created HiveMetaStoreClient with metastore uris $metastoreUris")
+ }
+ }
+
+ override def delegationTokensRequired(): Boolean = client.nonEmpty
+
+ override def obtainDelegationTokens(owner: String, creds: Credentials): Unit
= {
+ client.foreach { client =>
+ info(s"Getting Hive delegation token for $owner against $principal")
+ val tokenStr = client.getDelegationToken(owner, principal)
+ val hive2Token = new Token[DelegationTokenIdentifier]()
+ hive2Token.decodeFromUrlString(tokenStr)
+ debug(s"Get Token from hive metastore: ${hive2Token.toString}")
+ creds.addToken(tokenAlias, hive2Token)
+ }
+ }
+
+ override def close(): Unit = client.foreach(_.close())
+
+ private def tokenAlias: Text = new Text("hive.server2.delegation.token")
+}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala
index a0d5e38..a721c3c 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala
@@ -47,6 +47,7 @@ class HadoopCredentialsManagerSuite extends KyuubiFunSuite {
ExceptionThrowingDelegationTokenProvider.constructed = false
val providers = HadoopCredentialsManager.loadProviders(new
KyuubiConf(false))
assert(providers.contains("hadoopfs"))
+ assert(providers.contains("hive"))
assert(providers.contains("unstable"))
assert(providers.contains("unrequired"))
// This checks that providers are loaded independently and they have no
effect on each other
@@ -159,15 +160,11 @@ private class ExceptionThrowingDelegationTokenProvider
extends HadoopDelegationT
override def serviceName: String = "throw"
- override def delegationTokensRequired(
- hadoopConf: Configuration,
- kyuubiConf: KyuubiConf): Boolean = true
+ override def initialize(hadoopConf: Configuration, kyuubiConf: KyuubiConf):
Unit = {}
- override def obtainDelegationTokens(
- hadoopConf: Configuration,
- kyuubiConf: KyuubiConf,
- owner: String,
- creds: Credentials): Unit = {}
+ override def delegationTokensRequired(): Boolean = true
+
+ override def obtainDelegationTokens(owner: String, creds: Credentials): Unit
= {}
}
@@ -179,15 +176,11 @@ private class UnRequiredDelegationTokenProvider extends
HadoopDelegationTokenPro
override def serviceName: String = "unrequired"
- override def delegationTokensRequired(
- hadoopConf: Configuration,
- kyuubiConf: KyuubiConf): Boolean = false
+ override def initialize(hadoopConf: Configuration, kyuubiConf: KyuubiConf):
Unit = {}
+
+ override def delegationTokensRequired(): Boolean = false
- override def obtainDelegationTokens(
- hadoopConf: Configuration,
- kyuubiConf: KyuubiConf,
- owner: String,
- creds: Credentials): Unit = {}
+ override def obtainDelegationTokens(owner: String, creds: Credentials): Unit
= {}
}
@@ -195,15 +188,11 @@ private class UnstableDelegationTokenProvider extends
HadoopDelegationTokenProvi
override def serviceName: String = "unstable"
- override def delegationTokensRequired(
- hadoopConf: Configuration,
- kyuubiConf: KyuubiConf): Boolean = true
+ override def initialize(hadoopConf: Configuration, kyuubiConf: KyuubiConf):
Unit = {}
+
+ override def delegationTokensRequired(): Boolean = true
- override def obtainDelegationTokens(
- hadoopConf: Configuration,
- kyuubiConf: KyuubiConf,
- owner: String,
- creds: Credentials): Unit = {
+ override def obtainDelegationTokens(owner: String, creds: Credentials): Unit
= {
if (UnstableDelegationTokenProvider.throwException) {
UnstableDelegationTokenProvider.exceptionCount += 1
throw new IllegalArgumentException
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProviderSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProviderSuite.scala
index 7a4ed90..352ac3e 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProviderSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProviderSuite.scala
@@ -34,14 +34,15 @@ class HadoopFsDelegationTokenProviderSuite extends
WithSecuredDFSService {
UserGroupInformation.loginUserFromKeytab(testPrincipal, testKeytab)
val hdfsConf = getHadoopConf()
+ val kyuubiConf = new KyuubiConf(false)
+
+ val provider = new HadoopFsDelegationTokenProvider
+ provider.initialize(hdfsConf, kyuubiConf)
+ assert(provider.delegationTokensRequired())
+
val owner = "who"
val credentials = new Credentials()
- val provider = new HadoopFsDelegationTokenProvider
- provider.obtainDelegationTokens(
- hdfsConf,
- new KyuubiConf(false),
- owner,
- credentials)
+ provider.obtainDelegationTokens(owner, credentials)
val token = credentials
.getToken(new Text(FileSystem.get(hdfsConf).getCanonicalServiceName))
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HiveDelegationTokenProviderSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HiveDelegationTokenProviderSuite.scala
new file mode 100644
index 0000000..d600a64
--- /dev/null
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HiveDelegationTokenProviderSuite.scala
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.credentials
+
+import java.io.{File, FileOutputStream}
+import java.net.URLClassLoader
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.locks.ReentrantLock
+
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars._
+import org.apache.hadoop.hive.metastore.{HiveMetaException, HiveMetaStore}
+import org.apache.hadoop.hive.thrift.{DelegationTokenIdentifier,
HadoopThriftAuthBridge, HadoopThriftAuthBridge23}
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.thrift.TProcessor
+import org.apache.thrift.protocol.TProtocol
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
+
+import org.apache.kyuubi.{KerberizedTestHelper, Logging, Utils}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.credentials.LocalMetaServer.defaultHiveConf
+
+class HiveDelegationTokenProviderSuite extends KerberizedTestHelper {
+
+ private val hadoopConfDir: File = Utils.createTempDir().toFile
+ private var hiveConf: HiveConf = _
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+
+ tryWithSecurityEnabled {
+ UserGroupInformation.loginUserFromKeytab(testPrincipal, testKeytab)
+
+ // HiveMetaStore creates a new hadoop `Configuration` object for each
request to verify
+ // whether user can impersonate others.
+ // So we create a URLClassLoader with core-site.xml and set it as the
thread's context
+ // classloader when creating `Configuration` object.
+ val conf = new Configuration(false)
+ conf.set("hadoop.security.authentication", "kerberos")
+ val realUser = UserGroupInformation.getCurrentUser.getShortUserName
+ conf.set(s"hadoop.proxyuser.$realUser.groups", "*")
+ conf.set(s"hadoop.proxyuser.$realUser.hosts", "*")
+
+ val xml = new File(hadoopConfDir, "core-site.xml")
+ val os = new FileOutputStream(xml)
+ try {
+ conf.writeXml(os)
+ } finally {
+ os.close()
+ }
+
+ val classloader =
+ new URLClassLoader(
+ Array(hadoopConfDir.toURI.toURL),
+ classOf[Configuration].getClassLoader)
+
+ hiveConf = LocalMetaServer.defaultHiveConf()
+ hiveConf.addResource(conf)
+ hiveConf.setVar(METASTORE_USE_THRIFT_SASL, "true")
+ hiveConf.setVar(METASTORE_KERBEROS_PRINCIPAL, testPrincipal)
+ hiveConf.setVar(METASTORE_KERBEROS_KEYTAB_FILE, testKeytab)
+ val metaServer = new LocalMetaServer(hiveConf, classloader)
+ metaServer.start()
+ }
+ }
+
+ override def afterAll(): Unit = {
+ FileUtils.deleteDirectory(hadoopConfDir)
+ }
+
+ test("obtain hive delegation token") {
+ tryWithSecurityEnabled {
+ UserGroupInformation.loginUserFromKeytab(testPrincipal, testKeytab)
+
+ val kyuubiConf = new KyuubiConf(false)
+ val provider = new HiveDelegationTokenProvider
+ provider.initialize(hiveConf, kyuubiConf)
+ assert(provider.delegationTokensRequired())
+
+ val owner = "who"
+ val credentials = new Credentials
+ provider.obtainDelegationTokens(owner, credentials)
+
+ val token = credentials.getAllTokens.asScala
+ .filter(_.getKind == DelegationTokenIdentifier.HIVE_DELEGATION_KIND)
+ .head
+ assert(token != null)
+
+ val tokenIdent =
token.decodeIdentifier().asInstanceOf[DelegationTokenIdentifier]
+
assertResult(DelegationTokenIdentifier.HIVE_DELEGATION_KIND)(token.getKind)
+ assertResult(new Text(owner))(tokenIdent.getOwner)
+ val currentUserName = UserGroupInformation.getCurrentUser.getUserName
+ assertResult(new Text(currentUserName))(tokenIdent.getRealUser)
+ }
+ }
+}
+
+class LocalMetaServer(
+ hiveConf: HiveConf = defaultHiveConf(),
+ serverContextClassLoader: ClassLoader)
+ extends Logging {
+ import LocalMetaServer._
+
+ def start(): Unit = {
+ val startLock = new ReentrantLock
+ val startCondition = startLock.newCondition
+ val startedServing = new AtomicBoolean(false)
+ val startFailed = new AtomicBoolean(false)
+
+ Future {
+ try {
+ HiveMetaStore.startMetaStore(
+ port,
+ new HadoopThriftAuthBridgeWithServerContextClassLoader(
+ serverContextClassLoader),
+ hiveConf,
+ startLock,
+ startCondition,
+ startedServing)
+ } catch {
+ case t: Throwable =>
+ error("Failed to start LocalMetaServer", t)
+ startFailed.set(true)
+ }
+ }
+
+ eventually(timeout(30.seconds), interval(100.milliseconds)) {
+ assert(startedServing.get() || startFailed.get())
+ }
+
+ if (startFailed.get()) {
+ throw new HiveMetaException("Failed to start LocalMetaServer")
+ }
+ }
+
+ def getHiveConf: HiveConf = hiveConf
+}
+
+object LocalMetaServer {
+
+ private val port = 20101
+
+ def defaultHiveConf(): HiveConf = {
+ val hiveConf = new HiveConf()
+ hiveConf.setVar(METASTOREURIS, "thrift://localhost:" + port)
+ hiveConf.setVar(METASTORE_SCHEMA_VERIFICATION, "false")
+ hiveConf.set("datanucleus.schema.autoCreateTables", "true")
+ hiveConf
+ }
+
+}
+
+class HadoopThriftAuthBridgeWithServerContextClassLoader(classloader:
ClassLoader)
+ extends HadoopThriftAuthBridge23 {
+
+ override def createServer(
+ keytabFile: String,
+ principalConf: String): HadoopThriftAuthBridge.Server = {
+ new Server(keytabFile, principalConf)
+ }
+
+ class Server(keytabFile: String, principalConf: String)
+ extends HadoopThriftAuthBridge.Server(keytabFile, principalConf) {
+
+ override def wrapProcessor(processor: TProcessor): TProcessor = {
+ new SetThreadContextClassLoaderProcess(super.wrapProcessor(processor))
+ }
+
+ }
+
+ class SetThreadContextClassLoaderProcess(wrapped: TProcessor) extends
TProcessor {
+
+ override def process(in: TProtocol, out: TProtocol): Boolean = {
+ val origin = Thread.currentThread().getContextClassLoader
+ try {
+ Thread.currentThread().setContextClassLoader(classloader)
+ wrapped.process(in, out)
+ } finally {
+ Thread.currentThread().setContextClassLoader(origin)
+ }
+ }
+
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index c663977..be40371 100644
--- a/pom.xml
+++ b/pom.xml
@@ -810,6 +810,107 @@
</dependency>
<dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-metastore</artifactId>
+ <version>${hive.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-metastore</artifactId>
+ <version>${hive.version}</version>
+ <type>test-jar</type>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>co.cask.tephra</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javolution</groupId>
+ <artifactId>javolution</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.jobbox</groupId>
+ <artifactId>bonecp</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.zaxxer</groupId>
+ <artifactId>HikariCP</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-pool</groupId>
+ <artifactId>commons-pool</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-dbcp</groupId>
+ <artifactId>commons-dbcp</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive.shims</groupId>
+ <artifactId>hive-shims-common</artifactId>
+ <version>${hive.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive.shims</groupId>
+ <artifactId>hive-shims-0.23</artifactId>
+ <version>${hive.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <version>${hive.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${curator.version}</version>