This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b89272  [KYUUBI #1009] Implement Hive metastore server Delegation 
Token Provider
1b89272 is described below

commit 1b8927274999ef5519309032873b43d3b9c11569
Author: zhouyifan279 <[email protected]>
AuthorDate: Thu Sep 9 19:19:58 2021 +0800

    [KYUUBI #1009] Implement Hive metastore server Delegation Token Provider
    
    ### _Why are the changes needed?_
    This PR finishes the work of issue #1009
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #1053 from zhouyifan279/KYUUBI#1009.
    
    Closes #1009
    
    342fca91 [zhouyifan279] [KYUUBI #1009] Implement Hive metastore server 
Delegation Token Provider
    a9e1b34d [zhouyifan279] [KYUUBI #1009] Implement Hive metastore server 
Delegation Token Provider
    82da9007 [zhouyifan279] [KYUUBI #1009] Implement Hive metastore server 
Delegation Token Provider
    2c00dd3a [zhouyifan279] [KYUUBI #1009] Implement Hive metastore server 
Delegation Token Provider
    
    Authored-by: zhouyifan279 <[email protected]>
    Signed-off-by: ulysses-you <[email protected]>
---
 dev/dependencyList                                 |   4 +
 docs/deployment/settings.md                        |   1 +
 .../org/apache/kyuubi/config/KyuubiConf.scala      |   7 +
 kyuubi-server/pom.xml                              |  72 ++++++-
 ...g.apache.hadoop.security.token.TokenIdentifier} |   2 +-
 ...yuubi.credentials.HadoopDelegationTokenProvider |   1 +
 .../credentials/HadoopCredentialsManager.scala     |   7 +-
 .../credentials/HadoopDelegationProvider.scala     |  28 +--
 .../HadoopFsDelegationTokenProvider.scala          |  26 +--
 .../credentials/HiveDelegationTokenProvider.scala  |  72 +++++++
 .../HadoopCredentialsManagerSuite.scala            |  37 ++--
 .../HadoopFsDelegationTokenProviderSuite.scala     |  13 +-
 .../HiveDelegationTokenProviderSuite.scala         | 208 +++++++++++++++++++++
 pom.xml                                            | 101 ++++++++++
 14 files changed, 516 insertions(+), 63 deletions(-)

diff --git a/dev/dependencyList b/dev/dependencyList
index ebe2659..c2b27a0 100644
--- a/dev/dependencyList
+++ b/dev/dependencyList
@@ -17,6 +17,7 @@
 
 aopalliance-repackaged/2.6.1//aopalliance-repackaged-2.6.1.jar
 commons-codec/1.15//commons-codec-1.15.jar
+commons-lang/2.6//commons-lang-2.6.jar
 commons-lang3/3.10//commons-lang3-3.10.jar
 curator-client/2.12.0//curator-client-2.12.0.jar
 curator-framework/2.12.0//curator-framework-2.12.0.jar
@@ -25,7 +26,10 @@ failureaccess/1.0.1//failureaccess-1.0.1.jar
 guava/30.1-jre//guava-30.1-jre.jar
 hadoop-client-api/3.2.2//hadoop-client-api-3.2.2.jar
 hadoop-client-runtime/3.2.2//hadoop-client-runtime-3.2.2.jar
+hive-common/2.3.7//hive-common-2.3.7.jar
+hive-metastore/2.3.7//hive-metastore-2.3.7.jar
 hive-service-rpc/2.3.7//hive-service-rpc-2.3.7.jar
+hive-shims-common/2.3.7//hive-shims-common-2.3.7.jar
 hk2-api/2.6.1//hk2-api-2.6.1.jar
 hk2-locator/2.6.1//hk2-locator-2.6.1.jar
 hk2-utils/2.6.1//hk2-utils-2.6.1.jar
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index df2b936..a64c07b 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -153,6 +153,7 @@ Key | Default | Meaning | Type | Since
 --- | --- | --- | --- | ---
 kyuubi\.credentials<br>\.hadoopfs\.enabled|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>Whether to renew HadoopFS 
DelegationToken</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 
20pt'>1.4.0</div>
 kyuubi\.credentials<br>\.hadoopfs\.uris|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'></div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>Extra Hadoop filesystem URIs for which to 
request delegation tokens. The filesystem that hosts fs.defaultFS does not need 
to be listed here.</div>|<div style='width: 30pt'>seq</div>|<div style='width: 
20pt'>1.4.0</div>
+kyuubi\.credentials<br>\.hive\.enabled|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>Whether to renew HiveMetaStore 
DelegationToken</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 
20pt'>1.4.0</div>
 kyuubi\.credentials<br>\.renewal\.interval|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>PT1H</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>How often Kyuubi renews one user's 
DelegationTokens</div>|<div style='width: 30pt'>duration</div>|<div 
style='width: 20pt'>1.4.0</div>
 kyuubi\.credentials<br>\.renewal\.retryWait|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>PT1M</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>How long to wait before retrying to fetch new 
credentials after a failure.</div>|<div style='width: 30pt'>duration</div>|<div 
style='width: 20pt'>1.4.0</div>
 
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index ce748f4..05b8aa6 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -265,6 +265,13 @@ object KyuubiConf {
       .toSequence()
       .createWithDefault(Nil)
 
+  val CREDENTIALS_HIVE_ENABLED: ConfigEntry[Boolean] =
+    buildConf("credentials.hive.enabled")
+      .doc("Whether to renew HiveMetaStore DelegationToken")
+      .version("1.4.0")
+      .booleanConf
+      .createWithDefault(true)
+
   
/////////////////////////////////////////////////////////////////////////////////////////////////
   //                              Frontend Service Configuration               
                  //
   
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml
index dbc1aec..8b18244 100644
--- a/kyuubi-server/pom.xml
+++ b/kyuubi-server/pom.xml
@@ -63,7 +63,53 @@
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-client-runtime</artifactId>
-            <scope>runtime</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-metastore</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hive.shims</groupId>
+            <artifactId>hive-shims-common</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-common</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>commons-cli</groupId>
+                    <artifactId>commons-cli</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>jline</groupId>
+                    <artifactId>jline</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>joda-time</groupId>
+                    <artifactId>joda-time</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.commons</groupId>
+                    <artifactId>commons-compress</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.tdunning</groupId>
+                    <artifactId>json</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.github.joshelser</groupId>
+                    
<artifactId>dropwizard-metrics-hadoop-metrics2-reporter</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <dependency>
@@ -111,11 +157,6 @@
 
         <dependency>
             <groupId>org.apache.hive</groupId>
-            <artifactId>hive-common</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hive</groupId>
             <artifactId>hive-jdbc</artifactId>
             <scope>test</scope>
         </dependency>
@@ -136,6 +177,25 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-metastore</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-exec</artifactId>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-minikdc</artifactId>
             <scope>test</scope>
diff --git 
a/kyuubi-server/src/main/resources/META-INF/services/org.apache.kyuubi.credentials.HadoopDelegationTokenProvider
 
b/kyuubi-server/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
similarity index 92%
copy from 
kyuubi-server/src/main/resources/META-INF/services/org.apache.kyuubi.credentials.HadoopDelegationTokenProvider
copy to 
kyuubi-server/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
index 1dd41e9..9f2b2c5 100644
--- 
a/kyuubi-server/src/main/resources/META-INF/services/org.apache.kyuubi.credentials.HadoopDelegationTokenProvider
+++ 
b/kyuubi-server/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider
+org.apache.hadoop.hive.thrift.DelegationTokenIdentifier
diff --git 
a/kyuubi-server/src/main/resources/META-INF/services/org.apache.kyuubi.credentials.HadoopDelegationTokenProvider
 
b/kyuubi-server/src/main/resources/META-INF/services/org.apache.kyuubi.credentials.HadoopDelegationTokenProvider
index 1dd41e9..1d931c8 100644
--- 
a/kyuubi-server/src/main/resources/META-INF/services/org.apache.kyuubi.credentials.HadoopDelegationTokenProvider
+++ 
b/kyuubi-server/src/main/resources/META-INF/services/org.apache.kyuubi.credentials.HadoopDelegationTokenProvider
@@ -16,3 +16,4 @@
 #
 
 org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider
+org.apache.kyuubi.credentials.HiveDelegationTokenProvider
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
index 253778d..5d01790 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
@@ -92,10 +92,12 @@ class HadoopCredentialsManager private (name: String) 
extends AbstractService(na
     hadoopConf = KyuubiHadoopUtils.newHadoopConf(conf)
     providers = HadoopCredentialsManager.loadProviders(conf)
       .filter { case (_, provider) =>
-        val required = provider.delegationTokensRequired(hadoopConf, conf)
+        provider.initialize(hadoopConf, conf)
+        val required = provider.delegationTokensRequired()
         if (!required) {
           warn(s"Service ${provider.serviceName} does not require a token." +
             s" Check your configuration to see if security is disabled or 
not.")
+          provider.close()
         }
         required
       }
@@ -121,6 +123,7 @@ class HadoopCredentialsManager private (name: String) 
extends AbstractService(na
   }
 
   override def stop(): Unit = {
+    providers.values.foreach(_.close())
     renewalExecutor.foreach { executor =>
       executor.shutdownNow()
       try {
@@ -200,7 +203,7 @@ class HadoopCredentialsManager private (name: String) 
extends AbstractService(na
         try {
           val creds = new Credentials()
           providers.values
-            .foreach(_.obtainDelegationTokens(hadoopConf, conf, 
userRef.getAppUser, creds))
+            .foreach(_.obtainDelegationTokens(userRef.getAppUser, creds))
           userRef.updateCredentials(creds)
           scheduleRenewal(userRef, renewalInterval)
         } catch {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopDelegationProvider.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopDelegationProvider.scala
index cba5761..519ae78 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopDelegationProvider.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopDelegationProvider.scala
@@ -20,10 +20,9 @@ package org.apache.kyuubi.credentials
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.security.Credentials
 
-import org.apache.kyuubi.Logging
 import org.apache.kyuubi.config.KyuubiConf
 
-trait HadoopDelegationTokenProvider extends Logging {
+trait HadoopDelegationTokenProvider {
 
   /**
    * Name of the service to provide delegation tokens. This name should be 
unique. Kyuubi will
@@ -32,22 +31,27 @@ trait HadoopDelegationTokenProvider extends Logging {
   def serviceName: String
 
   /**
+   * Initialize with provided hadoop and kyuubi conf
+   * @param hadoopConf Configuration of current Hadoop Compatible system.
+   */
+  def initialize(hadoopConf: Configuration, kyuubiConf: KyuubiConf): Unit
+
+  /**
    * Returns true if delegation tokens are required for this service. By 
default, it is based on
    * whether Hadoop security is enabled.
    */
-  def delegationTokensRequired(hadoopConf: Configuration, kyuubiConf: 
KyuubiConf): Boolean
+  def delegationTokensRequired(): Boolean
 
   /**
    * Obtain delegation tokens for this service.
-   *
-   * @param hadoopConf Configuration of current Hadoop Compatible system.
-   * @param owner      DelegationToken owner.
-   * @param creds      Credentials to add tokens and security keys to.
+   * @param owner DelegationToken owner.
+   * @param creds Credentials to add tokens and security keys to.
+   */
+  def obtainDelegationTokens(owner: String, creds: Credentials): Unit
+
+  /**
+   * Close underlying resources if any
    */
-  def obtainDelegationTokens(
-    hadoopConf: Configuration,
-    kyuubiConf: KyuubiConf,
-    owner: String,
-    creds: Credentials): Unit
+  def close(): Unit = {}
 
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProvider.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProvider.scala
index 8bf0837..148b084 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProvider.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProvider.scala
@@ -33,28 +33,30 @@ import 
org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider.{disableFsC
 
 class HadoopFsDelegationTokenProvider extends HadoopDelegationTokenProvider 
with Logging {
 
+  private var tokenRequired: Boolean = _
+  private var hadoopConf: Configuration = _
+  private var kyuubiConf: KyuubiConf = _
+
   override val serviceName: String = "hadoopfs"
 
-  override def delegationTokensRequired(
-      hadoopConf: Configuration,
-      kyuubiConf: KyuubiConf): Boolean = {
-    SecurityUtil.getAuthenticationMethod(hadoopConf) != 
AuthenticationMethod.SIMPLE
-  }
+  override def initialize(hadoopConf: Configuration, kyuubiConf: KyuubiConf): 
Unit = {
+    this.tokenRequired =
+      SecurityUtil.getAuthenticationMethod(hadoopConf) != 
AuthenticationMethod.SIMPLE
 
-  override def obtainDelegationTokens(
-      hadoopConf: Configuration,
-      kyuubiConf: KyuubiConf,
-      owner: String,
-      creds: Credentials): Unit = {
     // FileSystem objects are cached in FileSystem.CACHE by a composite key.
     // The UserGroupInformation object used to create it is part of that key.
     // If cache is enabled, new FileSystem objects are created and cached at 
every method
     // invocation.
-    val internalConf = disableFsCache(kyuubiConf, hadoopConf)
+    this.hadoopConf = disableFsCache(kyuubiConf, hadoopConf)
+    this.kyuubiConf = kyuubiConf
+  }
+
+  override def delegationTokensRequired(): Boolean = tokenRequired
 
+  override def obtainDelegationTokens(owner: String, creds: Credentials): Unit 
= {
     doAsProxyUser(owner) {
       val fileSystems =
-        HadoopFsDelegationTokenProvider.hadoopFSsToAccess(kyuubiConf, 
internalConf)
+        HadoopFsDelegationTokenProvider.hadoopFSsToAccess(kyuubiConf, 
hadoopConf)
 
       try {
         // Renewer is not needed. But setting a renewer can avoid potential 
NPE.
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HiveDelegationTokenProvider.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HiveDelegationTokenProvider.scala
new file mode 100644
index 0000000..e1236ff
--- /dev/null
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HiveDelegationTokenProvider.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.credentials
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.metastore.{IMetaStoreClient, 
RetryingMetaStoreClient}
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.{Credentials, SecurityUtil}
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
+import org.apache.hadoop.security.token.Token
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.config.KyuubiConf
+
+class HiveDelegationTokenProvider extends HadoopDelegationTokenProvider with 
Logging {
+
+  private var client: Option[IMetaStoreClient] = None
+  private var principal: String = _
+
+  override def serviceName: String = "hive"
+
+  override def initialize(hadoopConf: Configuration, kyuubiConf: KyuubiConf): 
Unit = {
+    val conf = new HiveConf(hadoopConf, classOf[HiveConf])
+    val metastoreUris = conf.getTrimmed("hive.metastore.uris", "")
+
+    if (SecurityUtil.getAuthenticationMethod(hadoopConf) != 
AuthenticationMethod.SIMPLE
+      && metastoreUris.nonEmpty
+      && conf.getBoolean("hive.metastore.sasl.enabled", false)) {
+
+      val principalKey = "hive.metastore.kerberos.principal"
+      principal = conf.getTrimmed(principalKey, "")
+      require(principal.nonEmpty, s"Hive principal $principalKey undefined")
+
+      client = Some(RetryingMetaStoreClient.getProxy(conf, false))
+      info(s"Created HiveMetaStoreClient with metastore uris $metastoreUris")
+    }
+  }
+
+  override def delegationTokensRequired(): Boolean = client.nonEmpty
+
+  override def obtainDelegationTokens(owner: String, creds: Credentials): Unit 
= {
+    client.foreach { client =>
+      info(s"Getting Hive delegation token for $owner against $principal")
+      val tokenStr = client.getDelegationToken(owner, principal)
+      val hive2Token = new Token[DelegationTokenIdentifier]()
+      hive2Token.decodeFromUrlString(tokenStr)
+      debug(s"Get Token from hive metastore: ${hive2Token.toString}")
+      creds.addToken(tokenAlias, hive2Token)
+    }
+  }
+
+  override def close(): Unit = client.foreach(_.close())
+
+  private def tokenAlias: Text = new Text("hive.server2.delegation.token")
+}
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala
index a0d5e38..a721c3c 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala
@@ -47,6 +47,7 @@ class HadoopCredentialsManagerSuite extends KyuubiFunSuite {
     ExceptionThrowingDelegationTokenProvider.constructed = false
     val providers = HadoopCredentialsManager.loadProviders(new 
KyuubiConf(false))
     assert(providers.contains("hadoopfs"))
+    assert(providers.contains("hive"))
     assert(providers.contains("unstable"))
     assert(providers.contains("unrequired"))
     // This checks that providers are loaded independently and they have no 
effect on each other
@@ -159,15 +160,11 @@ private class ExceptionThrowingDelegationTokenProvider 
extends HadoopDelegationT
 
   override def serviceName: String = "throw"
 
-  override def delegationTokensRequired(
-      hadoopConf: Configuration,
-      kyuubiConf: KyuubiConf): Boolean = true
+  override def initialize(hadoopConf: Configuration, kyuubiConf: KyuubiConf): 
Unit = {}
 
-  override def obtainDelegationTokens(
-      hadoopConf: Configuration,
-      kyuubiConf: KyuubiConf,
-      owner: String,
-      creds: Credentials): Unit = {}
+  override def delegationTokensRequired(): Boolean = true
+
+  override def obtainDelegationTokens(owner: String, creds: Credentials): Unit 
= {}
 
 }
 
@@ -179,15 +176,11 @@ private class UnRequiredDelegationTokenProvider extends 
HadoopDelegationTokenPro
 
   override def serviceName: String = "unrequired"
 
-  override def delegationTokensRequired(
-      hadoopConf: Configuration,
-      kyuubiConf: KyuubiConf): Boolean = false
+  override def initialize(hadoopConf: Configuration, kyuubiConf: KyuubiConf): 
Unit = {}
+
+  override def delegationTokensRequired(): Boolean = false
 
-  override def obtainDelegationTokens(
-      hadoopConf: Configuration,
-      kyuubiConf: KyuubiConf,
-      owner: String,
-      creds: Credentials): Unit = {}
+  override def obtainDelegationTokens(owner: String, creds: Credentials): Unit 
= {}
 
 }
 
@@ -195,15 +188,11 @@ private class UnstableDelegationTokenProvider extends 
HadoopDelegationTokenProvi
 
   override def serviceName: String = "unstable"
 
-  override def delegationTokensRequired(
-      hadoopConf: Configuration,
-      kyuubiConf: KyuubiConf): Boolean = true
+  override def initialize(hadoopConf: Configuration, kyuubiConf: KyuubiConf): 
Unit = {}
+
+  override def delegationTokensRequired(): Boolean = true
 
-  override def obtainDelegationTokens(
-      hadoopConf: Configuration,
-      kyuubiConf: KyuubiConf,
-      owner: String,
-      creds: Credentials): Unit = {
+  override def obtainDelegationTokens(owner: String, creds: Credentials): Unit 
= {
     if (UnstableDelegationTokenProvider.throwException) {
       UnstableDelegationTokenProvider.exceptionCount += 1
       throw new IllegalArgumentException
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProviderSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProviderSuite.scala
index 7a4ed90..352ac3e 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProviderSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProviderSuite.scala
@@ -34,14 +34,15 @@ class HadoopFsDelegationTokenProviderSuite extends 
WithSecuredDFSService {
       UserGroupInformation.loginUserFromKeytab(testPrincipal, testKeytab)
 
       val hdfsConf = getHadoopConf()
+      val kyuubiConf = new KyuubiConf(false)
+
+      val provider = new HadoopFsDelegationTokenProvider
+      provider.initialize(hdfsConf, kyuubiConf)
+      assert(provider.delegationTokensRequired())
+
       val owner = "who"
       val credentials = new Credentials()
-      val provider = new HadoopFsDelegationTokenProvider
-      provider.obtainDelegationTokens(
-        hdfsConf,
-        new KyuubiConf(false),
-        owner,
-        credentials)
+      provider.obtainDelegationTokens(owner, credentials)
 
       val token = credentials
         .getToken(new Text(FileSystem.get(hdfsConf).getCanonicalServiceName))
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HiveDelegationTokenProviderSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HiveDelegationTokenProviderSuite.scala
new file mode 100644
index 0000000..d600a64
--- /dev/null
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HiveDelegationTokenProviderSuite.scala
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.credentials
+
+import java.io.{File, FileOutputStream}
+import java.net.URLClassLoader
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.locks.ReentrantLock
+
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars._
+import org.apache.hadoop.hive.metastore.{HiveMetaException, HiveMetaStore}
+import org.apache.hadoop.hive.thrift.{DelegationTokenIdentifier, 
HadoopThriftAuthBridge, HadoopThriftAuthBridge23}
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.thrift.TProcessor
+import org.apache.thrift.protocol.TProtocol
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
+
+import org.apache.kyuubi.{KerberizedTestHelper, Logging, Utils}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.credentials.LocalMetaServer.defaultHiveConf
+
+class HiveDelegationTokenProviderSuite extends KerberizedTestHelper {
+
+  private val hadoopConfDir: File = Utils.createTempDir().toFile
+  private var hiveConf: HiveConf = _
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+
+    tryWithSecurityEnabled {
+      UserGroupInformation.loginUserFromKeytab(testPrincipal, testKeytab)
+
+      // HiveMetaStore creates a new hadoop `Configuration` object for each 
request to verify
+      // whether user can impersonate others.
+      // So we create a URLClassLoader with core-site.xml and set it as the 
thread's context
+      // classloader when creating `Configuration` object.
+      val conf = new Configuration(false)
+      conf.set("hadoop.security.authentication", "kerberos")
+      val realUser = UserGroupInformation.getCurrentUser.getShortUserName
+      conf.set(s"hadoop.proxyuser.$realUser.groups", "*")
+      conf.set(s"hadoop.proxyuser.$realUser.hosts", "*")
+
+      val xml = new File(hadoopConfDir, "core-site.xml")
+      val os = new FileOutputStream(xml)
+      try {
+        conf.writeXml(os)
+      } finally {
+        os.close()
+      }
+
+      val classloader =
+        new URLClassLoader(
+          Array(hadoopConfDir.toURI.toURL),
+          classOf[Configuration].getClassLoader)
+
+      hiveConf = LocalMetaServer.defaultHiveConf()
+      hiveConf.addResource(conf)
+      hiveConf.setVar(METASTORE_USE_THRIFT_SASL, "true")
+      hiveConf.setVar(METASTORE_KERBEROS_PRINCIPAL, testPrincipal)
+      hiveConf.setVar(METASTORE_KERBEROS_KEYTAB_FILE, testKeytab)
+      val metaServer = new LocalMetaServer(hiveConf, classloader)
+      metaServer.start()
+    }
+  }
+
+  override def afterAll(): Unit = {
+    FileUtils.deleteDirectory(hadoopConfDir)
+  }
+
+  test("obtain hive delegation token") {
+    tryWithSecurityEnabled {
+      UserGroupInformation.loginUserFromKeytab(testPrincipal, testKeytab)
+
+      val kyuubiConf = new KyuubiConf(false)
+      val provider = new HiveDelegationTokenProvider
+      provider.initialize(hiveConf, kyuubiConf)
+      assert(provider.delegationTokensRequired())
+
+      val owner = "who"
+      val credentials = new Credentials
+      provider.obtainDelegationTokens(owner, credentials)
+
+      val token = credentials.getAllTokens.asScala
+        .filter(_.getKind == DelegationTokenIdentifier.HIVE_DELEGATION_KIND)
+        .head
+      assert(token != null)
+
+      val tokenIdent = 
token.decodeIdentifier().asInstanceOf[DelegationTokenIdentifier]
+      
assertResult(DelegationTokenIdentifier.HIVE_DELEGATION_KIND)(token.getKind)
+      assertResult(new Text(owner))(tokenIdent.getOwner)
+      val currentUserName = UserGroupInformation.getCurrentUser.getUserName
+      assertResult(new Text(currentUserName))(tokenIdent.getRealUser)
+    }
+  }
+}
+
+class LocalMetaServer(
+    hiveConf: HiveConf = defaultHiveConf(),
+    serverContextClassLoader: ClassLoader)
+    extends Logging {
+  import LocalMetaServer._
+
+  def start(): Unit = {
+    val startLock = new ReentrantLock
+    val startCondition = startLock.newCondition
+    val startedServing = new AtomicBoolean(false)
+    val startFailed = new AtomicBoolean(false)
+
+    Future {
+      try {
+        HiveMetaStore.startMetaStore(
+          port,
+          new HadoopThriftAuthBridgeWithServerContextClassLoader(
+            serverContextClassLoader),
+          hiveConf,
+          startLock,
+          startCondition,
+          startedServing)
+      } catch {
+        case t: Throwable =>
+          error("Failed to start LocalMetaServer", t)
+          startFailed.set(true)
+      }
+    }
+
+    eventually(timeout(30.seconds), interval(100.milliseconds)) {
+      assert(startedServing.get() || startFailed.get())
+    }
+
+    if (startFailed.get()) {
+      throw new HiveMetaException("Failed to start LocalMetaServer")
+    }
+  }
+
+  def getHiveConf: HiveConf = hiveConf
+}
+
+object LocalMetaServer {
+
+  private val port = 20101
+
+  def defaultHiveConf(): HiveConf = {
+    val hiveConf = new HiveConf()
+    hiveConf.setVar(METASTOREURIS, "thrift://localhost:" + port)
+    hiveConf.setVar(METASTORE_SCHEMA_VERIFICATION, "false")
+    hiveConf.set("datanucleus.schema.autoCreateTables", "true")
+    hiveConf
+  }
+
+}
+
+class HadoopThriftAuthBridgeWithServerContextClassLoader(classloader: 
ClassLoader)
+    extends HadoopThriftAuthBridge23 {
+
+  override def createServer(
+      keytabFile: String,
+      principalConf: String): HadoopThriftAuthBridge.Server = {
+    new Server(keytabFile, principalConf)
+  }
+
+  class Server(keytabFile: String, principalConf: String)
+      extends HadoopThriftAuthBridge.Server(keytabFile, principalConf) {
+
+    override def wrapProcessor(processor: TProcessor): TProcessor = {
+      new SetThreadContextClassLoaderProcess(super.wrapProcessor(processor))
+    }
+
+  }
+
+  class SetThreadContextClassLoaderProcess(wrapped: TProcessor) extends 
TProcessor {
+
+    override def process(in: TProtocol, out: TProtocol): Boolean = {
+      val origin = Thread.currentThread().getContextClassLoader
+      try {
+        Thread.currentThread().setContextClassLoader(classloader)
+        wrapped.process(in, out)
+      } finally {
+        Thread.currentThread().setContextClassLoader(origin)
+      }
+    }
+
+  }
+
+}
diff --git a/pom.xml b/pom.xml
index c663977..be40371 100644
--- a/pom.xml
+++ b/pom.xml
@@ -810,6 +810,107 @@
             </dependency>
 
             <dependency>
+                <groupId>org.apache.hive</groupId>
+                <artifactId>hive-metastore</artifactId>
+                <version>${hive.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.hive</groupId>
+                <artifactId>hive-metastore</artifactId>
+                <version>${hive.version}</version>
+                <type>test-jar</type>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.apache.hadoop</groupId>
+                        <artifactId>*</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.apache.hive</groupId>
+                        <artifactId>*</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.apache.thrift</groupId>
+                        <artifactId>*</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>co.cask.tephra</groupId>
+                        <artifactId>*</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>javolution</groupId>
+                        <artifactId>javolution</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>com.google.guava</groupId>
+                        <artifactId>guava</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>com.google.protobuf</groupId>
+                        <artifactId>protobuf-java</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.apache.hbase</groupId>
+                        <artifactId>hbase-client</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>com.jobbox</groupId>
+                        <artifactId>bonecp</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>com.zaxxer</groupId>
+                        <artifactId>HikariCP</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>commons-cli</groupId>
+                        <artifactId>commons-cli</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>commons-lang</groupId>
+                        <artifactId>commons-lang</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>commons-pool</groupId>
+                        <artifactId>commons-pool</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>commons-dbcp</groupId>
+                        <artifactId>commons-dbcp</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.hive.shims</groupId>
+                <artifactId>hive-shims-common</artifactId>
+                <version>${hive.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>*</groupId>
+                        <artifactId>*</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.hive.shims</groupId>
+                <artifactId>hive-shims-0.23</artifactId>
+                <version>${hive.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>*</groupId>
+                        <artifactId>*</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.hive</groupId>
+                <artifactId>hive-exec</artifactId>
+                <version>${hive.version}</version>
+            </dependency>
+
+            <dependency>
                 <groupId>org.apache.curator</groupId>
                 <artifactId>curator-framework</artifactId>
                 <version>${curator.version}</version>

Reply via email to