This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e23e7a  [KYUUBI #1978] Support NEGOTIATE/BASIC authorization for 
restful frontend service
1e23e7a is described below

commit 1e23e7a93cbfa9c11f42e739d7b96fef9b7a0bf4
Author: Fei Wang <[email protected]>
AuthorDate: Thu Mar 10 21:04:20 2022 +0800

    [KYUUBI #1978] Support NEGOTIATE/BASIC authorization for restful frontend 
service
    
    ### _Why are the changes needed?_
    
    to close #1978
    
    Support both NEGOTIATE and BASIC authentication for restful frontend 
service.
    
    At first, I involve two auth schemes, FYI 
[hadoop/HttpConstants.java](https://github.com/apache/hadoop/blob/trunk/hadoop-common-project%2Fhadoop-auth%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fhadoop%2Fsecurity%2Fauthentication%2Fserver%2FHttpConstants.java)
    - BASIC: mapping to NOSASL, NONE, LDAP, CUSTOM authentication
    - NEGOTIATE: mapping to KERBEROS authentication
    
    BTW, hadoop also supports `Digest` auth scheme.
    
    Two authentication handlers:
    - BasicAuthenticationHandler(reuse existing 
passwdAuthenticationProvider(LDAP/CUSTOM))
    - KerberosAuthenticationHandler(refer 
[hadoop/KerberosAuthenticationHandler.java](https://github.com/apache/hadoop/blob/trunk/hadoop-common-project%2Fhadoop-auth%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fhadoop%2Fsecurity%2Fauthentication%2Fserver%2FKerberosAuthenticationHandler.java)
 and 
[hadoop/KerberosUtil.java](https://github.com/apache/hadoop/blob/trunk/hadoop-common-project%2Fhadoop-auth%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fhadoop%2Fsecurity%2Fauthentication%2Futil%2FKerberosUtil.java))
    
    #### AuthenticationFilter
    - redirect the http request according to the authentication scheme 
specified in `Authorization` header.
    
    For basic authentication
    The value of Authorization header is `BASIC ${encodeBase64(user + ":" + 
password)}`.
    Then using the passwdAuthenticationProvider to authenticate `user` and 
`password`.
    
    For SPNEGO(kerberos) authentication.
    The value of Authorization header is `NEGOTIATE 
${encodeBase64(clientToken)}`.
    Then checking whether the clientToken is valid.
    
    And currently, the AuthenticationFilter take affect for all the requests 
with pathSpec `/api/*`.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #2049 from turboFei/kyuubi_1978_kerberos.
    
    Closes #1978
    
    c8b6362b [Fei Wang] refactor
    cedec70f [Fei Wang] add spnego unit test
    c1d45cde [Fei Wang] rename package to http.authentication
    3d0b220b [Fei Wang] address comments
    296f181e [Fei Wang] fix ut
    f9371e14 [Fei Wang] spnego and basic auth
    
    Authored-by: Fei Wang <[email protected]>
    Signed-off-by: Fei Wang <[email protected]>
---
 docs/deployment/settings.md                        |   8 +
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  14 ++
 .../org/apache/kyuubi/KerberizedTestHelper.scala   |  47 ++++-
 .../kyuubi/server/KyuubiRestFrontendService.scala  |  11 +-
 .../server/http/authentication/AuthSchemes.scala   |  24 +++
 .../http/authentication/AuthenticationFilter.scala | 167 ++++++++++++++++
 .../authentication/AuthenticationHandler.scala     | 121 ++++++++++++
 .../BasicAuthenticationHandler.scala               |  93 +++++++++
 .../KerberosAuthenticationHandler.scala            | 169 +++++++++++++++++
 .../server/http/authentication/KerberosUtil.scala  | 211 +++++++++++++++++++++
 .../org/apache/kyuubi/server/ui/JettyServer.scala  |  25 ++-
 .../org/apache/kyuubi/RestFrontendTestHelper.scala |   2 +-
 .../operation/KyuubiRestAuthenticationSuite.scala  | 148 +++++++++++++++
 .../authentication/AuthenticationFilterSuite.scala |  38 ++++
 14 files changed, 1072 insertions(+), 6 deletions(-)

diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 38a176d..e478d02 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -337,6 +337,14 @@ Key | Default | Meaning | Type | Since
 <code>kyuubi.session.timeout</code>|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>PT6H</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>(deprecated)session timeout, it will be closed 
when it's not accessed for this duration</div>|<div style='width: 
30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
 
 
+### Spnego
+
+Key | Default | Meaning | Type | Since
+--- | --- | --- | --- | ---
+<code>kyuubi.spnego.keytab</code>|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 
170pt;word-wrap: break-word;white-space: normal'>Keytab file for SPNego 
principal</div>|<div style='width: 30pt'>string</div>|<div style='width: 
20pt'>1.6.0</div>
+<code>kyuubi.spnego.principal</code>|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 
170pt;word-wrap: break-word;white-space: normal'>SPNego service principal, 
typical value would look like HTTP/[email protected]. SPNego service principal 
would be used when restful Kerberos security is enabled. This needs to be set 
only if SPNEGO is to be used in authentication.</div>|<div style='width: 
30pt'>string</div>|<div style='width: 20 [...]
+
+
 ### Zookeeper
 
 Key | Default | Meaning | Type | Since
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 53e3f93..5affad3 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -197,6 +197,20 @@ object KyuubiConf {
     .stringConf
     .createOptional
 
+  val SERVER_SPNEGO_KEYTAB: OptionalConfigEntry[String] = 
buildConf("spnego.keytab")
+    .doc("Keytab file for SPNego principal")
+    .version("1.6.0")
+    .stringConf
+    .createOptional
+
+  val SERVER_SPNEGO_PRINCIPAL: OptionalConfigEntry[String] = 
buildConf("spnego.principal")
+    .doc("SPNego service principal, typical value would look like 
HTTP/[email protected]." +
+      " SPNego service principal would be used when restful Kerberos security 
is enabled." +
+      " This needs to be set only if SPNEGO is to be used in authentication.")
+    .version("1.6.0")
+    .stringConf
+    .createOptional
+
   val KINIT_INTERVAL: ConfigEntry[Long] = buildConf("kinit.interval")
     .doc("How often will Kyuubi server run `kinit -kt [keytab] [principal]` to 
renew the" +
       " local Kerberos credentials cache")
diff --git 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/KerberizedTestHelper.scala 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/KerberizedTestHelper.scala
index 05a97ee..ceb476c 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/KerberizedTestHelper.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/KerberizedTestHelper.scala
@@ -20,6 +20,9 @@ package org.apache.kyuubi
 import java.io.File
 import java.nio.charset.StandardCharsets
 import java.nio.file.Files
+import java.security.PrivilegedExceptionAction
+import java.util.Base64
+import javax.security.sasl.AuthenticationException
 
 import scala.io.{Codec, Source}
 import scala.util.control.NonFatal
@@ -27,6 +30,7 @@ import scala.util.control.NonFatal
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.minikdc.MiniKdc
 import org.apache.hadoop.security.UserGroupInformation
+import org.ietf.jgss.{GSSContext, GSSException, GSSManager, GSSName}
 import org.scalatest.time.SpanSugar._
 
 trait KerberizedTestHelper extends KyuubiFunSuite {
@@ -54,6 +58,7 @@ trait KerberizedTestHelper extends KyuubiFunSuite {
   private val keytabFile = new File(baseDir, "kyuubi-test.keytab")
   protected val testKeytab: String = keytabFile.getAbsolutePath
   protected var testPrincipal: String = _
+  protected var testSpnegoPrincipal: String = _
 
   override def beforeAll(): Unit = {
     eventually(timeout(60.seconds), interval(1.second)) {
@@ -71,10 +76,13 @@ trait KerberizedTestHelper extends KyuubiFunSuite {
       }
     }
     val tempTestPrincipal = s"client/$hostName"
-    kdc.createPrincipal(keytabFile, tempTestPrincipal)
+    val tempSpnegoPrincipal = s"HTTP/$hostName"
+    kdc.createPrincipal(keytabFile, tempTestPrincipal, tempSpnegoPrincipal)
     rewriteKrb5Conf()
     testPrincipal = tempTestPrincipal + "@" + kdc.getRealm
+    testSpnegoPrincipal = tempSpnegoPrincipal + "@" + kdc.getRealm
     info(s"KerberizedTest Principal: $testPrincipal")
+    info(s"KerberizedTest SPNEGO Principal: $testSpnegoPrincipal")
     info(s"KerberizedTest Keytab: $testKeytab")
     super.beforeAll()
   }
@@ -148,4 +156,41 @@ trait KerberizedTestHelper extends KyuubiFunSuite {
       assert(!UserGroupInformation.isSecurityEnabled)
     }
   }
+
+  /**
+   * Generate SPNEGO challenge request token.
+   * Copy from Apache Hadoop YarnClientUtils::generateToken
+   *
+   * @param server - hostname to contact
+   * @return SPNEGO token challenge
+   */
+  def generateToken(server: String): String = {
+    val currentUser = UserGroupInformation.getCurrentUser
+    currentUser.doAs(new PrivilegedExceptionAction[String] {
+      override def run(): String =
+        try {
+          val manager = GSSManager.getInstance()
+          val serverName = manager.createName("HTTP@" + server, 
GSSName.NT_HOSTBASED_SERVICE)
+          val gssContext = manager.createContext(
+            serverName.canonicalize(null),
+            null,
+            null,
+            GSSContext.DEFAULT_LIFETIME)
+          // Create a GSSContext for authentication with the service.
+          // We're passing client credentials as null since we want them to
+          // be read from the Subject.
+          // We're passing Oid as null to use the default.
+          gssContext.requestMutualAuth(true)
+          gssContext.requestCredDeleg(true)
+          // Establish context
+          val inToken = Array.empty[Byte]
+          val outToken = gssContext.initSecContext(inToken, 0, inToken.length)
+          gssContext.dispose()
+          new String(Base64.getEncoder.encode(outToken), 
StandardCharsets.UTF_8)
+        } catch {
+          case e: GSSException =>
+            throw new AuthenticationException("Failed to generate token", e)
+        }
+    })
+  }
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
index ed571b7..3cb1794 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
@@ -17,12 +17,17 @@
 
 package org.apache.kyuubi.server
 
+import java.util.EnumSet
 import java.util.concurrent.atomic.AtomicBoolean
+import javax.servlet.DispatcherType
+
+import org.eclipse.jetty.servlet.FilterHolder
 
 import org.apache.kyuubi.{KyuubiException, Utils}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_REST_BIND_HOST, 
FRONTEND_REST_BIND_PORT}
 import org.apache.kyuubi.server.api.v1.ApiRootResource
+import org.apache.kyuubi.server.http.authentication.AuthenticationFilter
 import org.apache.kyuubi.server.ui.JettyServer
 import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service}
 
@@ -50,7 +55,11 @@ class KyuubiRestFrontendService(override val serverable: 
Serverable)
   }
 
   private def startInternal(): Unit = {
-    server.addHandler(ApiRootResource.getServletHandler(this))
+    val contextHandler = ApiRootResource.getServletHandler(this)
+    val holder = new FilterHolder(new AuthenticationFilter(conf))
+    contextHandler.addFilter(holder, "/*", 
EnumSet.allOf(classOf[DispatcherType]))
+    server.addHandler(contextHandler)
+
     server.addStaticHandler("org/apache/kyuubi/ui/static", "/static")
     server.addRedirectHandler("/", "/static")
     server.addStaticHandler("org/apache/kyuubi/ui/swagger", "/swagger")
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/authentication/AuthSchemes.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/authentication/AuthSchemes.scala
new file mode 100644
index 0000000..c7924ab
--- /dev/null
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/authentication/AuthSchemes.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.http.authentication
+
+object AuthSchemes extends Enumeration {
+  type AuthScheme = Value
+
+  val BASIC, NEGOTIATE = Value
+}
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/authentication/AuthenticationFilter.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/authentication/AuthenticationFilter.scala
new file mode 100644
index 0000000..360d8c3
--- /dev/null
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/authentication/AuthenticationFilter.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.http.authentication
+
+import java.io.IOException
+import javax.servlet.{Filter, FilterChain, FilterConfig, ServletException, 
ServletRequest, ServletResponse}
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+
+import scala.collection.mutable.HashMap
+
+import org.apache.hadoop.security.authentication.client.AuthenticationException
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.AUTHENTICATION_METHOD
+import org.apache.kyuubi.service.authentication.AuthTypes
+import org.apache.kyuubi.service.authentication.AuthTypes.{KERBEROS, NOSASL}
+
+class AuthenticationFilter(conf: KyuubiConf) extends Filter with Logging {
+  import AuthenticationFilter._
+  import AuthenticationHandler._
+  import AuthSchemes._
+
+  private[authentication] val authSchemeHandlers = new HashMap[AuthScheme, 
AuthenticationHandler]()
+
+  private[authentication] def addAuthHandler(authHandler: 
AuthenticationHandler): Unit = {
+    authHandler.init(conf)
+    if (authHandler.authenticationSupported) {
+      if (authSchemeHandlers.contains(authHandler.authScheme)) {
+        warn(s"Authentication handler has been defined for scheme 
${authHandler.authScheme}")
+      } else {
+        info(s"Add authentication handler 
${authHandler.getClass.getSimpleName}" +
+          s" for scheme ${authHandler.authScheme}")
+        authSchemeHandlers.put(authHandler.authScheme, authHandler)
+      }
+    } else {
+      warn(s"The authentication handler ${authHandler.getClass.getSimpleName}" 
+
+        s" for scheme ${authHandler.authScheme} is not supported")
+    }
+  }
+
+  override def init(filterConfig: FilterConfig): Unit = {
+    val authTypes = conf.get(AUTHENTICATION_METHOD).map(AuthTypes.withName)
+    val spnegoKerberosEnabled = authTypes.contains(KERBEROS)
+    val basicAuthTypeOpt = {
+      if (authTypes == Seq(NOSASL)) {
+        authTypes.headOption
+      } else {
+        
authTypes.filterNot(_.equals(KERBEROS)).filterNot(_.equals(NOSASL)).headOption
+      }
+    }
+    if (spnegoKerberosEnabled) {
+      val kerberosHandler = new KerberosAuthenticationHandler
+      addAuthHandler(kerberosHandler)
+    }
+    basicAuthTypeOpt.foreach { basicAuthType =>
+      val basicHandler = new BasicAuthenticationHandler(basicAuthType)
+      addAuthHandler(basicHandler)
+    }
+    super.init(filterConfig)
+  }
+
+  /**
+   * If the request has a valid authentication token it allows the request to 
continue to the
+   * target resource, otherwise it triggers an authentication sequence using 
the configured
+   * {@link AuthenticationHandler}.
+   *
+   * @param request     the request object.
+   * @param response    the response object.
+   * @param filterChain the filter chain object.
+   * @throws IOException      thrown if an IO error occurred.
+   * @throws ServletException thrown if a processing error occurred.
+   */
+  override def doFilter(
+      request: ServletRequest,
+      response: ServletResponse,
+      filterChain: FilterChain): Unit = {
+    val httpRequest = request.asInstanceOf[HttpServletRequest]
+    val httpResponse = response.asInstanceOf[HttpServletResponse]
+
+    val authorization = httpRequest.getHeader(AUTHORIZATION_HEADER)
+    var matchedHandler: AuthenticationHandler = null
+
+    for (authHandler <- authSchemeHandlers.values if matchedHandler == null) {
+      if (authHandler.matchAuthScheme(authorization)) {
+        matchedHandler = authHandler
+      }
+    }
+
+    if (matchedHandler == null) {
+      debug(s"No auth scheme matched for url: ${httpRequest.getRequestURL}")
+      httpResponse.setStatus(HttpServletResponse.SC_UNAUTHORIZED)
+      httpResponse.sendError(
+        HttpServletResponse.SC_UNAUTHORIZED,
+        s"No auth scheme matched for $authorization")
+    } else {
+      HTTP_CLIENT_IP_ADDRESS.set(httpRequest.getRemoteAddr)
+      try {
+        val authUser = matchedHandler.authenticate(httpRequest, httpResponse)
+        if (authUser != null) {
+          HTTP_CLIENT_USER_NAME.set(authUser)
+          doFilter(filterChain, httpRequest, httpResponse)
+        }
+      } catch {
+        case e: AuthenticationException =>
+          HTTP_CLIENT_USER_NAME.remove()
+          HTTP_CLIENT_IP_ADDRESS.remove()
+          httpResponse.setStatus(HttpServletResponse.SC_FORBIDDEN)
+          httpResponse.sendError(HttpServletResponse.SC_FORBIDDEN, 
e.getMessage)
+      }
+    }
+  }
+
+  /**
+   * Delegates call to the servlet filter chain. Sub-classes my override this
+   * method to perform pre and post tasks.
+   *
+   * @param filterChain the filter chain object.
+   * @param request     the request object.
+   * @param response    the response object.
+   * @throws IOException      thrown if an IO error occurred.
+   * @throws ServletException thrown if a processing error occurred.
+   */
+  @throws[IOException]
+  @throws[ServletException]
+  protected def doFilter(
+      filterChain: FilterChain,
+      request: HttpServletRequest,
+      response: HttpServletResponse): Unit = {
+    filterChain.doFilter(request, response)
+  }
+
+  override def destroy(): Unit = {
+    if (!authSchemeHandlers.isEmpty) {
+      authSchemeHandlers.values.foreach(_.destroy())
+      authSchemeHandlers.clear()
+    }
+  }
+}
+
+object AuthenticationFilter {
+  final val HTTP_CLIENT_IP_ADDRESS = new ThreadLocal[String]() {
+    override protected def initialValue: String = null
+  }
+  final val HTTP_CLIENT_USER_NAME = new ThreadLocal[String]() {
+    override protected def initialValue: String = null
+  }
+
+  def getUserIpAddress: String = HTTP_CLIENT_IP_ADDRESS.get
+
+  def getUserName: String = HTTP_CLIENT_USER_NAME.get
+}
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/authentication/AuthenticationHandler.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/authentication/AuthenticationHandler.scala
new file mode 100644
index 0000000..f606253
--- /dev/null
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/authentication/AuthenticationHandler.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.http.authentication
+
+import javax.security.sasl.AuthenticationException
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+
+import org.apache.hadoop.security.authentication.server.HttpConstants
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.server.http.authentication.AuthSchemes.AuthScheme
+
+trait AuthenticationHandler {
+  import AuthenticationHandler._
+
+  /**
+   * HTTP header prefix used during the authentication sequence.
+   */
+  val authScheme: AuthScheme
+
+  /**
+   *  Initializes the authentication handler instance.
+   */
+  def init(conf: KyuubiConf): Unit
+
+  /**
+   * Whether this authentication handler is configured well and support 
authentication.
+   */
+  def authenticationSupported: Boolean
+
+  /**
+   * Destroys the authentication handler instance.
+   * <p>
+   * This method is invoked by the {@link AuthenticationFilter# destroy} 
method.
+   */
+  def destroy(): Unit
+
+  /**
+   * Performs an authentication step for the given HTTP client request.
+   * <p>
+   * This method is invoked by the {@link AuthenticationFilter} only if the 
HTTP client request is
+   * not yet authenticated.
+   * <p>
+   * Depending upon the authentication mechanism being implemented, a 
particular HTTP client may
+   * end up making a sequence of invocations before authentication is 
successfully established
+   * (this is the case of Kerberos SPNEGO).
+   * <p>
+   * This method must return a user name only if the the HTTP client request 
has been successfully
+   * and fully authenticated.
+   * <p>
+   * If the HTTP client request has not been completely authenticated, this 
method must take over
+   * the corresponding HTTP response and it must return <code>null</code>.
+   *
+   * @param request  the HTTP client request.
+   * @param response the HTTP client response.
+   * @return the user name
+   * @throws AuthenticationException thrown if an Authentication error 
occurred.
+   */
+  def authenticate(request: HttpServletRequest, response: 
HttpServletResponse): String
+
+  /**
+   * This method checks if the specified <code>authorization</code> belongs to 
the
+   * auth schema of current authentication handler.
+   * @param authorization Authentication header value which is to be compared 
with the
+   *                      authentication scheme.
+   */
+  def matchAuthScheme(authorization: String): Boolean = {
+    if (authorization == null || authorization.isEmpty) {
+      false
+    } else {
+      authorization.trim.regionMatches(true, 0, authScheme.toString, 0, 
authScheme.toString.length)
+    }
+  }
+
+  /**
+   * Get the decoded authorization value after auth schema in Authentication 
header.
+   */
+  def getAuthorization(request: HttpServletRequest): String = {
+    val authHeader = request.getHeader(AUTHORIZATION_HEADER)
+    // each http request must have an Authorization header
+    if (authHeader == null || authHeader.isEmpty) {
+      throw new AuthenticationException("Authorization header received from 
the client is empty.")
+    }
+
+    val authorization = authHeader.substring(authScheme.toString.length).trim
+    // Authorization header must have a payload
+    if (authorization == null || authorization.isEmpty()) {
+      throw new AuthenticationException(
+        "Authorization header received from the client does not contain any 
data.")
+    }
+    authorization
+  }
+}
+
+object AuthenticationHandler {
+
+  /**
+   * HTTP header used by the SPNEGO server endpoint during an authentication 
sequence.
+   */
+  final val WWW_AUTHENTICATE: String = HttpConstants.WWW_AUTHENTICATE_HEADER
+
+  /**
+   * HTTP header used by the client endpoint during an authentication sequence.
+   */
+  final val AUTHORIZATION_HEADER = "Authorization"
+}
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/authentication/BasicAuthenticationHandler.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/authentication/BasicAuthenticationHandler.scala
new file mode 100644
index 0000000..86db1af
--- /dev/null
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/authentication/BasicAuthenticationHandler.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.http.authentication
+
+import java.nio.charset.Charset
+import java.util.Base64
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.server.http.authentication.AuthSchemes.AuthScheme
+import 
org.apache.kyuubi.service.authentication.{AuthenticationProviderFactory, 
AuthMethods}
+import org.apache.kyuubi.service.authentication.AuthTypes._
+
+class BasicAuthenticationHandler(basicAuthType: AuthType)
+  extends AuthenticationHandler with Logging {
+  import AuthenticationHandler._
+
+  private var conf: KyuubiConf = _
+  private val allowAnonymous = basicAuthType == NOSASL || basicAuthType == NONE
+
+  override val authScheme: AuthScheme = AuthSchemes.BASIC
+
+  override def init(conf: KyuubiConf): Unit = {
+    this.conf = conf
+  }
+
+  override def authenticationSupported: Boolean = {
+    basicAuthType != null
+  }
+
+  override def matchAuthScheme(authorization: String): Boolean = {
+    if (authorization == null || authorization.isEmpty) {
+      allowAnonymous
+    } else {
+      super.matchAuthScheme(authorization)
+    }
+  }
+
+  override def getAuthorization(request: HttpServletRequest): String = {
+    val authHeader = request.getHeader(AUTHORIZATION_HEADER)
+    if (allowAnonymous && (authHeader == null || authHeader.isEmpty)) {
+      ""
+    } else {
+      super.getAuthorization(request)
+    }
+  }
+
+  override def authenticate(
+      request: HttpServletRequest,
+      response: HttpServletResponse): String = {
+    var authUser: String = null
+
+    val authorization = getAuthorization(request)
+    val inputToken = Option(authorization).map(a => 
Base64.getDecoder.decode(a.getBytes()))
+      .getOrElse(Array.empty[Byte])
+    val creds = new String(inputToken, Charset.forName("UTF-8")).split(":")
+
+    if (allowAnonymous) {
+      authUser = creds.take(1).headOption.getOrElse("anonymous")
+    } else {
+      if (creds.size < 2 || creds(0).trim.isEmpty || creds(1).trim.isEmpty) {
+        response.setHeader(WWW_AUTHENTICATE, authScheme.toString)
+        response.setStatus(HttpServletResponse.SC_UNAUTHORIZED)
+      } else {
+        val Seq(user, password) = creds.toSeq.take(2)
+        val passwdAuthenticationProvider = AuthenticationProviderFactory
+          
.getAuthenticationProvider(AuthMethods.withName(basicAuthType.toString), conf)
+        passwdAuthenticationProvider.authenticate(user, password)
+        response.setStatus(HttpServletResponse.SC_OK)
+        authUser = user
+      }
+    }
+    authUser
+  }
+
+  override def destroy(): Unit = {}
+}
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/authentication/KerberosAuthenticationHandler.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/authentication/KerberosAuthenticationHandler.scala
new file mode 100644
index 0000000..19a31fe
--- /dev/null
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/authentication/KerberosAuthenticationHandler.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.http.authentication
+
+import java.io.{File, IOException}
+import java.security.{PrivilegedActionException, PrivilegedExceptionAction}
+import java.util.Base64
+import javax.security.auth.Subject
+import javax.security.auth.kerberos.{KerberosPrincipal, KeyTab}
+import javax.security.sasl.AuthenticationException
+import javax.servlet.ServletException
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+
+import org.apache.hadoop.security.authentication.util.KerberosName
+import org.ietf.jgss.{GSSContext, GSSCredential, GSSManager, Oid}
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.config.KyuubiConf
+
+class KerberosAuthenticationHandler extends AuthenticationHandler with Logging 
{
+  import AuthenticationHandler._
+  import AuthSchemes._
+  import KerberosUtil._
+
+  private var gssManager: GSSManager = _
+  private var conf: KyuubiConf = _
+  private var serverSubject = new Subject()
+  private var keytab: String = _
+  private var principal: String = _
+
+  override val authScheme: AuthScheme = AuthSchemes.NEGOTIATE
+
+  override def authenticationSupported: Boolean = {
+    !keytab.isEmpty && !principal.isEmpty
+  }
+
+  override def init(conf: KyuubiConf): Unit = {
+    this.conf = conf
+    keytab = conf.get(KyuubiConf.SERVER_SPNEGO_KEYTAB).getOrElse("")
+    principal = conf.get(KyuubiConf.SERVER_SPNEGO_PRINCIPAL).getOrElse("")
+    if (authenticationSupported) {
+      val keytabFile = new File(keytab)
+      if (!keytabFile.exists()) {
+        throw new ServletException(s"Keytab[$keytab] does not exists")
+      }
+      if (!principal.startsWith("HTTP/")) {
+        throw new ServletException(s"SPNEGO principal[$principal] does not 
start with HTTP/")
+      }
+
+      info(s"Using keytab $keytab, for principal $principal")
+      serverSubject.getPrivateCredentials().add(KeyTab.getInstance(keytabFile))
+      serverSubject.getPrincipals.add(new KerberosPrincipal(principal))
+
+      // TODO: support to config kerberos.name.rules and 
kerberos.rule.mechanism
+      // set default rules if no rules set, otherwise it will throw exception
+      // when parse the kerberos name
+      if (!KerberosName.hasRulesBeenSet) {
+        KerberosName.setRules("DEFAULT")
+      }
+
+      try {
+        gssManager = Subject.doAs(
+          serverSubject,
+          new PrivilegedExceptionAction[GSSManager] {
+            override def run(): GSSManager = {
+              GSSManager.getInstance()
+            }
+          })
+      } catch {
+        case e: PrivilegedActionException => throw e.getException
+        case e: Exception => throw new ServletException(e)
+      }
+    }
+  }
+
+  override def destroy(): Unit = {
+    keytab = null
+    serverSubject = null
+  }
+
+  override def authenticate(
+      request: HttpServletRequest,
+      response: HttpServletResponse): String = {
+    var authUser: String = null
+    val authorization = getAuthorization(request)
+    val clientToken = Base64.getDecoder.decode(authorization)
+    try {
+      val serverPrincipal = getTokenServerName(clientToken)
+      if (!serverPrincipal.startsWith("HTTP/")) {
+        throw new IllegalArgumentException(
+          s"Invalid server principal $serverPrincipal decoded from client 
request")
+      }
+      authUser = Subject.doAs(
+        serverSubject,
+        new PrivilegedExceptionAction[String] {
+          override def run(): String = {
+            runWithPrincipal(serverPrincipal, clientToken, response)
+          }
+        })
+    } catch {
+      case ex: PrivilegedActionException =>
+        ex.getException match {
+          case ioe: IOException =>
+            throw ioe
+          case e: Exception => throw new AuthenticationException("SPNEGO 
authentication failed", e)
+        }
+
+      case e: Exception => throw new AuthenticationException("SPNEGO 
authentication failed", e)
+    }
+    authUser
+  }
+
+  def runWithPrincipal(
+      serverPrincipal: String,
+      clientToken: Array[Byte],
+      response: HttpServletResponse): String = {
+    var gssContext: GSSContext = null
+    var gssCreds: GSSCredential = null
+    var authUser: String = null
+    try {
+      debug(s"SPNEGO initialized with server principal $serverPrincipal")
+      gssCreds = gssManager.createCredential(
+        gssManager.createName(serverPrincipal, NT_GSS_KRB5_PRINCIPAL_OID),
+        GSSCredential.INDEFINITE_LIFETIME,
+        Array[Oid](GSS_SPNEGO_MECH_OID, GSS_KRB5_MECH_OID),
+        GSSCredential.ACCEPT_ONLY)
+      gssContext = gssManager.createContext(gssCreds)
+      val serverToken = gssContext.acceptSecContext(clientToken, 0, 
clientToken.length)
+      if (serverToken != null && serverToken.length > 0) {
+        val authenticate = Base64.getEncoder.encodeToString(serverToken)
+        response.setHeader(WWW_AUTHENTICATE, s"$NEGOTIATE $authenticate")
+      }
+      if (!gssContext.isEstablished) {
+        response.setStatus(HttpServletResponse.SC_UNAUTHORIZED)
+        debug("SPNEGO in progress")
+      } else {
+        val clientPrincipal = gssContext.getSrcName.toString
+        val kerberosName = new KerberosName(clientPrincipal)
+        val userName = kerberosName.getShortName
+        authUser = userName
+        response.setStatus(HttpServletResponse.SC_OK)
+        debug(s"SPNEGO completed for client principal $clientPrincipal")
+      }
+    } finally {
+      if (gssContext != null) {
+        gssContext.dispose()
+      }
+      if (gssCreds != null) {
+        gssCreds.dispose()
+      }
+    }
+    authUser
+  }
+}
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/authentication/KerberosUtil.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/authentication/KerberosUtil.scala
new file mode 100644
index 0000000..8ff0793
--- /dev/null
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/http/authentication/KerberosUtil.scala
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.http.authentication
+
+import java.io.UnsupportedEncodingException
+import java.nio.ByteBuffer
+import java.nio.charset.IllegalCharsetNameException
+
+import org.ietf.jgss.{GSSException, Oid}
+
+/**
+ * Copy from Apache Hadoop `KerberosUtil`
+ */
+object KerberosUtil {
+  // Oid for SPNego GSS-API mechanism.
+  val GSS_SPNEGO_MECH_OID: Oid = getNumericOidInstance("1.3.6.1.5.5.2")
+  // This Oid for Kerberos GSS-API mechanism.
+  val GSS_KRB5_MECH_OID: Oid = getNumericOidInstance("1.2.840.113554.1.2.2")
+  // Oid for kerberos principal name
+  val NT_GSS_KRB5_PRINCIPAL_OID: Oid = 
getNumericOidInstance("1.2.840.113554.1.2.2.1")
+
+  // numeric oids will never generate a GSSException for a malformed oid.
+  // use to initialize statics.
+  private def getNumericOidInstance(oidName: String): Oid =
+    try {
+      new Oid(oidName)
+    } catch {
+      case ex: GSSException =>
+        throw new IllegalArgumentException(ex)
+    }
+
+  // basic ASN.1 DER decoder to traverse encoded byte arrays.
+  private class DER(val srcbb: ByteBuffer) extends java.util.Iterator[DER] {
+    import DER._
+
+    final private val tag: Int = srcbb.get() & 0xFF
+    val length = readLength(srcbb)
+    final private val bb: ByteBuffer = srcbb.slice
+    bb.limit(length)
+    srcbb.position(srcbb.position() + length)
+
+    def this(buf: Array[Byte]) {
+      this(ByteBuffer.wrap(buf))
+    }
+
+    def getTag: Int = {
+      tag
+    }
+
+    def choose(subtag: Int): DER = {
+      while (hasNext) {
+        val der: DER = next
+        if (der.getTag == subtag) {
+          return der
+        }
+      }
+      null
+    }
+
+    def get(tags: Int*): DER = {
+      var der: DER = this
+      for (i <- 0 until tags.length) {
+        val expectedTag: Int = tags(i)
+        // lookup for exact match, else scan if it's sequenced.
+        if (der.getTag != expectedTag) {
+          der =
+            if (der.hasNext) {
+              der.choose(expectedTag)
+            } else {
+              null
+            }
+        }
+        if (der == null) {
+          val sb: StringBuilder = new StringBuilder("Tag not found:")
+          for (ii <- 0 to i) {
+            sb.append(" 0x").append(Integer.toHexString(tags(ii)))
+          }
+          throw new IllegalStateException(sb.toString)
+        }
+      }
+      der
+    }
+
+    def getAsString: String =
+      try {
+        new String(bb.array, bb.arrayOffset + bb.position(), bb.remaining, 
"UTF-8")
+      } catch {
+        case _: UnsupportedEncodingException =>
+          throw new IllegalCharsetNameException("UTF-8") // won't happen.
+      }
+
+    override def hashCode: Int = {
+      31 * tag + bb.hashCode
+    }
+
+    override def equals(o: Any): Boolean = {
+      o.isInstanceOf[DER] && tag.equals(o.asInstanceOf[DER].tag) &&
+      bb.equals(o.asInstanceOf[DER].bb)
+    }
+
+    override def hasNext: Boolean = {
+      // it's a sequence or an embedded octet.
+      ((tag & 0x30) != 0 || tag == 0x04) && bb.hasRemaining
+    }
+
+    override def next: DER = {
+      if (!hasNext) {
+        throw new NoSuchElementException
+      }
+      new DER(bb)
+    }
+
+    override def toString: String = {
+      "[tag=0x" + Integer.toHexString(tag) + " bb=" + bb + "]"
+    }
+  }
+
+  private object DER {
+    val SPNEGO_MECH_OID: DER = getDER(GSS_SPNEGO_MECH_OID)
+    val KRB5_MECH_OID: DER = getDER(GSS_KRB5_MECH_OID)
+
+    private def getDER(oid: Oid): DER =
+      try {
+        new DER(oid.getDER)
+      } catch {
+        case ex: GSSException =>
+          // won't happen.  a proper OID is encodable.
+          throw new IllegalArgumentException(ex)
+      }
+
+    // standard ASN.1 encoding.
+    private def readLength(bb: ByteBuffer): Int = {
+      var length: Int = bb.get
+      if ((length & 0x80.toByte) != 0) {
+        val varlength: Int = length & 0x7F
+        length = 0
+        (0 until varlength).foreach { _ =>
+          length = (length << 8) | (bb.get & 0xFF)
+        }
+      }
+      length
+    }
+  }
+
+  /**
+   * Extract the TGS server principal from the given gssapi kerberos or spnego
+   * wrapped token.
+   *
+   * @param rawToken bytes of the gss token
+   * @return String of server principal
+   * @throws IllegalArgumentException if token is undecodable
+   */
+  def getTokenServerName(rawToken: Array[Byte]): String = {
+    // subsequent comments include only relevant portions of the kerberos
+    // DER encoding that will be extracted.
+    var token = new DER(rawToken)
+    // InitialContextToken ::= [APPLICATION 0] IMPLICIT SEQUENCE {
+    //     mech   OID
+    //     mech-token  (NegotiationToken or InnerContextToken)
+    // }
+    var oid = token.next
+    if (oid.equals(DER.SPNEGO_MECH_OID)) {
+      // NegotiationToken ::= CHOICE {
+      //     neg-token-init[0] NegTokenInit
+      // NegTokenInit ::= SEQUENCE {
+      //     mech-token[2]     InitialContextToken
+      token = token.next.get(0xA0, 0x30, 0xA2, 0x04).next
+      oid = token.next
+    }
+    if (!oid.equals(DER.KRB5_MECH_OID)) {
+      throw new IllegalArgumentException("Malformed gss token")
+    }
+    // InnerContextToken ::= {
+    //     token-id[1]
+    //     AP-REQ
+    if (token.next.getTag != 1) throw new IllegalArgumentException("Not an 
AP-REQ token")
+    // AP-REQ ::= [APPLICATION 14] SEQUENCE {
+    //     ticket[3]      Ticket
+    val ticket = token.next.get(0x6E, 0x30, 0xA3, 0x61, 0x30)
+    // Ticket ::= [APPLICATION 1] SEQUENCE {
+    //     realm[1]       String
+    //     sname[2]       PrincipalName
+    // PrincipalName ::= SEQUENCE {
+    //     name-string[1] SEQUENCE OF String
+    val realm = ticket.get(0xA1, 0x1B).getAsString
+    val names = ticket.get(0xA2, 0x30, 0xA1, 0x30)
+    val sb = new StringBuilder
+    while (names.hasNext) {
+      if (sb.length > 0) {
+        sb.append('/')
+      }
+      sb.append(names.next.getAsString)
+    }
+    sb.append('@').append(realm).toString
+  }
+}
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/ui/JettyServer.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/ui/JettyServer.scala
index ceea333..35faee1 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/ui/JettyServer.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/ui/JettyServer.scala
@@ -17,13 +17,16 @@
 
 package org.apache.kyuubi.server.ui
 
-import org.eclipse.jetty.server.{HttpConfiguration, HttpConnectionFactory, 
Server, ServerConnector}
-import org.eclipse.jetty.server.handler.{ContextHandlerCollection, 
ErrorHandler}
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+
+import org.eclipse.jetty.server.{HttpConfiguration, HttpConnectionFactory, 
Request, Server, ServerConnector}
+import org.eclipse.jetty.server.handler.{ContextHandlerCollection, 
ErrorHandler, HandlerWrapper}
 import org.eclipse.jetty.servlet.ServletContextHandler
 import org.eclipse.jetty.util.component.LifeCycle
 import org.eclipse.jetty.util.thread.{QueuedThreadPool, 
ScheduledExecutorScheduler}
 
 import org.apache.kyuubi.Utils.isWindows
+import org.apache.kyuubi.server.http.authentication.AuthenticationFilter
 
 private[kyuubi] case class JettyServer(
     server: Server,
@@ -53,8 +56,24 @@ private[kyuubi] case class JettyServer(
   def getServerUri: String = connector.getHost + ":" + connector.getLocalPort
 
   def addHandler(handler: ServletContextHandler): Unit = synchronized {
-    rootHandler.addHandler(handler)
+    val handlerWrapper = new HandlerWrapper {
+      override def handle(
+          target: String,
+          baseRequest: Request,
+          request: HttpServletRequest,
+          response: HttpServletResponse): Unit = {
+        try {
+          super.handle(target, baseRequest, request, response)
+        } finally {
+          AuthenticationFilter.HTTP_CLIENT_USER_NAME.remove()
+          AuthenticationFilter.HTTP_CLIENT_IP_ADDRESS.remove()
+        }
+      }
+    }
+    handlerWrapper.setHandler(handler)
+    rootHandler.addHandler(handlerWrapper)
     if (!handler.isStarted) handler.start()
+    if (!handlerWrapper.isStarted) handlerWrapper.start()
   }
 
   def addStaticHandler(
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/RestFrontendTestHelper.scala 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/RestFrontendTestHelper.scala
index b9232b7..c081185 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/RestFrontendTestHelper.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/RestFrontendTestHelper.scala
@@ -50,7 +50,7 @@ object RestFrontendTestHelper {
 
 trait RestFrontendTestHelper extends WithKyuubiServer {
 
-  override protected val conf: KyuubiConf = KyuubiConf()
+  override protected lazy val conf: KyuubiConf = KyuubiConf()
 
   override protected val frontendProtocols: Seq[FrontendProtocol] =
     FrontendProtocols.REST :: Nil
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiRestAuthenticationSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiRestAuthenticationSuite.scala
new file mode 100644
index 0000000..cdd920d
--- /dev/null
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiRestAuthenticationSuite.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation
+
+import java.util.Base64
+import javax.servlet.http.HttpServletResponse
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.kyuubi.{KerberizedTestHelper, RestFrontendTestHelper}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.server.api.v1.SessionOpenCount
+import 
org.apache.kyuubi.server.http.authentication.AuthenticationHandler.AUTHORIZATION_HEADER
+import 
org.apache.kyuubi.service.authentication.{UserDefineAuthenticationProviderImpl, 
WithLdapServer}
+
+class KyuubiRestAuthenticationSuite extends RestFrontendTestHelper with 
KerberizedTestHelper
+  with WithLdapServer {
+
+  private val customUser: String = "user"
+  private val customPasswd: String = "password"
+  private val currentUser = UserGroupInformation.getCurrentUser
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+  }
+
+  override def afterAll(): Unit = {
+    System.clearProperty("java.security.krb5.conf")
+    UserGroupInformation.setLoginUser(currentUser)
+    UserGroupInformation.setConfiguration(new Configuration())
+    assert(!UserGroupInformation.isSecurityEnabled)
+    super.afterAll()
+  }
+
+  override protected lazy val conf: KyuubiConf = {
+    val config = new Configuration()
+    val authType = "hadoop.security.authentication"
+    config.set(authType, "KERBEROS")
+    System.setProperty("java.security.krb5.conf", krb5ConfPath)
+    UserGroupInformation.setConfiguration(config)
+    assert(UserGroupInformation.isSecurityEnabled)
+
+    KyuubiConf().set(KyuubiConf.AUTHENTICATION_METHOD, Seq("KERBEROS", "LDAP", 
"CUSTOM"))
+      .set(KyuubiConf.SERVER_KEYTAB.key, testKeytab)
+      .set(KyuubiConf.SERVER_PRINCIPAL, testPrincipal)
+      .set(KyuubiConf.SERVER_SPNEGO_KEYTAB, testKeytab)
+      .set(KyuubiConf.SERVER_SPNEGO_PRINCIPAL, testSpnegoPrincipal)
+      .set(KyuubiConf.AUTHENTICATION_LDAP_URL, ldapUrl)
+      .set(KyuubiConf.AUTHENTICATION_LDAP_BASEDN, ldapBaseDn)
+      .set(
+        KyuubiConf.AUTHENTICATION_CUSTOM_CLASS,
+        classOf[UserDefineAuthenticationProviderImpl].getCanonicalName)
+  }
+
+  test("test with LDAP authorization") {
+    val encodeAuthorization = new String(
+      Base64.getEncoder.encode(
+        s"$ldapUser:$ldapUserPasswd".getBytes()),
+      "UTF-8")
+    val response = webTarget.path("api/v1/sessions/count")
+      .request()
+      .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
+      .get()
+
+    assert(HttpServletResponse.SC_OK == response.getStatus)
+    val openedSessionCount = response.readEntity(classOf[SessionOpenCount])
+    assert(openedSessionCount.openSessionCount == 0)
+  }
+
+  test("test with CUSTOM authorization") {
+    val encodeAuthorization = new String(
+      Base64.getEncoder.encode(
+        s"$customUser:$customPasswd".getBytes()),
+      "UTF-8")
+    val response = webTarget.path("api/v1/sessions/count")
+      .request()
+      .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
+      .get()
+
+    assert(HttpServletResponse.SC_INTERNAL_SERVER_ERROR == response.getStatus)
+  }
+
+  test("test without authorization") {
+    val response = webTarget.path("api/v1/sessions/count")
+      .request()
+      .get()
+
+    assert(HttpServletResponse.SC_UNAUTHORIZED == response.getStatus)
+  }
+
+  test("test with valid spnego authentication") {
+    UserGroupInformation.loginUserFromKeytab(testPrincipal, testKeytab)
+    val token = generateToken(hostName)
+    val response = webTarget.path("api/v1/sessions/count")
+      .request()
+      .header(AUTHORIZATION_HEADER, s"NEGOTIATE $token")
+      .get()
+
+    assert(HttpServletResponse.SC_OK == response.getStatus)
+  }
+
+  test("test with invalid spnego authorization") {
+    val encodeAuthorization = new String(
+      Base64.getEncoder.encode(
+        s"invalidKerberosToken".getBytes()),
+      "UTF-8")
+    val response = webTarget.path("api/v1/sessions/count")
+      .request()
+      .header(AUTHORIZATION_HEADER, s"NEGOTIATE $encodeAuthorization")
+      .get()
+
+    assert(HttpServletResponse.SC_INTERNAL_SERVER_ERROR == response.getStatus)
+  }
+
+  test("test with not supported auth scheme") {
+    val encodeAuthorization = new String(
+      Base64.getEncoder.encode(
+        s"$ldapUser:$ldapUserPasswd".getBytes()),
+      "UTF-8")
+    val response = webTarget.path("api/v1/sessions/count")
+      .request()
+      .header(AUTHORIZATION_HEADER, s"OTHER_SCHEME $encodeAuthorization")
+      .get()
+
+    assert(HttpServletResponse.SC_UNAUTHORIZED == response.getStatus)
+  }
+
+  test("test with non-authentication path") {
+    val response = webTarget.path("swagger").request().get()
+    assert(HttpServletResponse.SC_OK == response.getStatus)
+  }
+}
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/http/authentication/AuthenticationFilterSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/http/authentication/AuthenticationFilterSuite.scala
new file mode 100644
index 0000000..9a79d79
--- /dev/null
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/http/authentication/AuthenticationFilterSuite.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.http.authentication
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.service.authentication.AuthTypes
+
+class AuthenticationFilterSuite extends KyuubiFunSuite {
+  test("add auth handler and destroy") {
+    val filter = new AuthenticationFilter(KyuubiConf())
+    filter.addAuthHandler(new BasicAuthenticationHandler(null))
+    assert(filter.authSchemeHandlers.size == 0)
+    filter.addAuthHandler(new BasicAuthenticationHandler(AuthTypes.LDAP))
+    assert(filter.authSchemeHandlers.size == 1)
+    filter.addAuthHandler(new BasicAuthenticationHandler(AuthTypes.LDAP))
+    assert(filter.authSchemeHandlers.size == 1)
+    filter.addAuthHandler(new KerberosAuthenticationHandler())
+    assert(filter.authSchemeHandlers.size == 1)
+    filter.destroy()
+    assert(filter.authSchemeHandlers.isEmpty)
+  }
+}

Reply via email to