This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 5cea9cc7f [CELEBORN-1318] Support celeborn http authentication
5cea9cc7f is described below

commit 5cea9cc7f21577a9b660ec84761efdd82e0ef7e9
Author: Fei Wang <[email protected]>
AuthorDate: Thu Jun 20 10:35:12 2024 +0800

    [CELEBORN-1318] Support celeborn http authentication
    
    ### What changes were proposed in this pull request?
    Support celeborn master/worker http authentication.
    
    ### Why are the changes needed?
    Authentication is needed for celeborn admin APIs.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, introduce authentication related config items, but does not break the 
current behavior.
    
    ### How was this patch tested?
    
    Added UT for BASIC and Bearer authentication.
    
    Closes #2440 from turboFei/http_auth.
    
    Authored-by: Fei Wang <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala  | 132 ++++++++++++-
 .../AnonymousAuthenticationProviderImpl.scala      |  33 ++++
 .../common/authentication/BasicPrincipal.scala     |  40 ++++
 .../common/authentication/HttpAuthSchemes.scala    |  23 +++
 .../PasswdAuthenticationProvider.scala             |  37 ++++
 .../TokenAuthenticationProvider.scala              |  37 ++++
 docs/configuration/master.md                       |   6 +
 docs/configuration/worker.md                       |   6 +
 .../ApiMasterResourceAuthenticationSuite.scala     |  64 +++++++
 .../celeborn/server/common/HttpService.scala       |  11 +-
 .../server/common/http/HttpAuthUtils.scala         |  25 +++
 .../authentication/AuthenticationAuditLogger.scala |  43 +++++
 .../http/authentication/AuthenticationFilter.scala | 205 +++++++++++++++++++++
 .../authentication/AuthenticationHandler.scala     | 110 +++++++++++
 .../BasicAuthenticationHandler.scala               |  97 ++++++++++
 .../BearerAuthenticationHandler.scala              |  89 +++++++++
 .../authentication/HttpAuthenticationFactory.scala |  92 +++++++++
 .../SpnegoAuthenticationHandler.scala              | 165 +++++++++++++++++
 .../http/ApiBaseResourceAuthenticationSuite.scala  | 119 ++++++++++++
 .../server/common/http/ApiBaseResourceSuite.scala  |  44 ++---
 .../server/common/http/HttpTestHelper.scala        |   1 +
 ...rDefinePasswordAuthenticationProviderImpl.scala |  41 +++++
 ...UserDefineTokenAuthenticationProviderImpl.scala |  40 ++++
 .../worker/http/api/ApiWorkerResourceSuite.scala   |   9 +-
 .../ApiWorkerResourceAuthenticationSuite.scala     |  43 +++++
 25 files changed, 1484 insertions(+), 28 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 690758b77..742c16444 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable
 import scala.concurrent.duration._
 import scala.util.Try
 
-import org.apache.celeborn.common.CelebornConf.MASTER_INTERNAL_ENDPOINTS
+import 
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl
 import org.apache.celeborn.common.identity.{DefaultIdentityProvider, 
IdentityProvider}
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.internal.config._
@@ -2193,6 +2193,71 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("30s")
 
+  val MASTER_HTTP_AUTH_SUPPORTED_SCHEMES: ConfigEntry[Seq[String]] =
+    buildConf("celeborn.master.http.auth.supportedSchemes")
+      .categories("master")
+      .version("0.6.0")
+      .doc("A comma-separated list of master http auth supported schemes." +
+        "<ul>" +
+        " <li>SPNEGO: Kerberos/GSSAPI authentication.</li>" +
+        " <li>BASIC: User-defined password authentication, the concreted 
implementation is" +
+        " configurable via `celeborn.master.http.auth.basic.provider`.</li>" +
+        " <li>BEARER: User-defined bearer token authentication, the concreted 
implementation is" +
+        " configurable via `celeborn.master.http.auth.bearer.provider`.</li>" +
+        "</ul>")
+      .stringConf
+      .toSequence
+      .createWithDefault(Nil)
+
+  val MASTER_HTTP_SPNEGO_KEYTAB: OptionalConfigEntry[String] =
+    buildConf("celeborn.master.http.spnego.keytab")
+      .categories("master")
+      .version("0.6.0")
+      .doc("The keytab file for SPNego authentication.")
+      .stringConf
+      .createOptional
+
+  val MASTER_HTTP_SPNEGO_PRINCIPAL: OptionalConfigEntry[String] =
+    buildConf("celeborn.master.http.spnego.principal")
+      .categories("master")
+      .version("0.6.0")
+      .doc("SPNego service principal, typical value would look like 
HTTP/[email protected]." +
+        " SPNego service principal would be used when celeborn http 
authentication is enabled." +
+        " This needs to be set only if SPNEGO is to be used in 
authentication.")
+      .stringConf
+      .createOptional
+
+  val MASTER_HTTP_PROXY_CLIENT_IP_HEADER: ConfigEntry[String] =
+    buildConf("celeborn.master.http.proxy.client.ip.header")
+      .categories("master")
+      .doc("The HTTP header to record the real client IP address. If your 
server is behind a load" +
+        " balancer or other proxy, the server will see this load balancer or 
proxy IP address as" +
+        " the client IP address, to get around this common issue, most load 
balancers or proxies" +
+        " offer the ability to record the real remote IP address in an HTTP 
header that will be" +
+        " added to the request for other devices to use. Note that, because 
the header value can" +
+        " be specified to any IP address, so it will not be used for 
authentication.")
+      .version("0.6.0")
+      .stringConf
+      .createWithDefault("X-Real-IP")
+
+  val MASTER_HTTP_AUTH_BASIC_PROVIDER: ConfigEntry[String] =
+    buildConf("celeborn.master.http.auth.basic.provider")
+      .categories("master")
+      .version("0.6.0")
+      .doc("User-defined password authentication implementation of " +
+        
"org.apache.celeborn.common.authentication.PasswdAuthenticationProvider")
+      .stringConf
+      .createWithDefault(classOf[AnonymousAuthenticationProviderImpl].getName)
+
+  val MASTER_HTTP_AUTH_BEARER_PROVIDER: ConfigEntry[String] =
+    buildConf("celeborn.master.http.auth.bearer.provider")
+      .categories("master")
+      .version("0.6.0")
+      .doc("User-defined token authentication implementation of " +
+        
"org.apache.celeborn.common.authentication.TokenAuthenticationProvider")
+      .stringConf
+      .createWithDefault(classOf[AnonymousAuthenticationProviderImpl].getName)
+
   val HA_ENABLED: ConfigEntry[Boolean] =
     buildConf("celeborn.master.ha.enabled")
       .withAlternative("celeborn.ha.enabled")
@@ -2795,6 +2860,71 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("30s")
 
+  val WORKER_HTTP_AUTH_SUPPORTED_SCHEMES: ConfigEntry[Seq[String]] =
+    buildConf("celeborn.worker.http.auth.supportedSchemes")
+      .categories("worker")
+      .version("0.6.0")
+      .doc("A comma-separated list of worker http auth supported schemes." +
+        "<ul>" +
+        " <li>SPNEGO: Kerberos/GSSAPI authentication.</li>" +
+        " <li>BASIC: User-defined password authentication, the concreted 
implementation is" +
+        " configurable via `celeborn.worker.http.auth.basic.provider`.</li>" +
+        " <li>BEARER: User-defined bearer token authentication, the concreted 
implementation is" +
+        " configurable via `celeborn.worker.http.auth.bearer.provider`.</li>" +
+        "</ul>")
+      .stringConf
+      .toSequence
+      .createWithDefault(Nil)
+
+  val WORKER_HTTP_SPNEGO_KEYTAB: OptionalConfigEntry[String] =
+    buildConf("celeborn.worker.http.spnego.keytab")
+      .categories("worker")
+      .version("0.6.0")
+      .doc("The keytab file for SPNego authentication.")
+      .stringConf
+      .createOptional
+
+  val WORKER_HTTP_SPNEGO_PRINCIPAL: OptionalConfigEntry[String] =
+    buildConf("celeborn.worker.http.spnego.principal")
+      .categories("worker")
+      .version("0.6.0")
+      .doc("SPNego service principal, typical value would look like 
HTTP/[email protected]." +
+        " SPNego service principal would be used when celeborn http 
authentication is enabled." +
+        " This needs to be set only if SPNEGO is to be used in 
authentication.")
+      .stringConf
+      .createOptional
+
+  val WORKER_HTTP_PROXY_CLIENT_IP_HEADER: ConfigEntry[String] =
+    buildConf("celeborn.worker.http.proxy.client.ip.header")
+      .categories("worker")
+      .doc("The HTTP header to record the real client IP address. If your 
server is behind a load" +
+        " balancer or other proxy, the server will see this load balancer or 
proxy IP address as" +
+        " the client IP address, to get around this common issue, most load 
balancers or proxies" +
+        " offer the ability to record the real remote IP address in an HTTP 
header that will be" +
+        " added to the request for other devices to use. Note that, because 
the header value can" +
+        " be specified to any IP address, so it will not be used for 
authentication.")
+      .version("0.6.0")
+      .stringConf
+      .createWithDefault("X-Real-IP")
+
+  val WORKER_HTTP_AUTH_BASIC_PROVIDER: ConfigEntry[String] =
+    buildConf("celeborn.worker.http.auth.basic.provider")
+      .categories("worker")
+      .version("0.6.0")
+      .doc("User-defined password authentication implementation of " +
+        
"org.apache.celeborn.common.authentication.PasswdAuthenticationProvider")
+      .stringConf
+      .createWithDefault(classOf[AnonymousAuthenticationProviderImpl].getName)
+
+  val WORKER_HTTP_AUTH_BEARER_PROVIDER: ConfigEntry[String] =
+    buildConf("celeborn.worker.http.auth.bearer.provider")
+      .categories("worker")
+      .version("0.6.0")
+      .doc("User-defined token authentication implementation of " +
+        
"org.apache.celeborn.common.authentication.TokenAuthenticationProvider")
+      .stringConf
+      .createWithDefault(classOf[AnonymousAuthenticationProviderImpl].getName)
+
   val WORKER_RPC_PORT: ConfigEntry[Int] =
     buildConf("celeborn.worker.rpc.port")
       .categories("worker")
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/authentication/AnonymousAuthenticationProviderImpl.scala
 
b/common/src/main/scala/org/apache/celeborn/common/authentication/AnonymousAuthenticationProviderImpl.scala
new file mode 100644
index 000000000..de38c1b30
--- /dev/null
+++ 
b/common/src/main/scala/org/apache/celeborn/common/authentication/AnonymousAuthenticationProviderImpl.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.authentication
+
+import java.security.Principal
+
+class AnonymousAuthenticationProviderImpl extends PasswdAuthenticationProvider
+  with TokenAuthenticationProvider {
+  override def authenticate(user: String, password: String): Principal = {
+    // no-op authentication
+    new BasicPrincipal(user)
+  }
+
+  override def authenticate(token: String): Principal = {
+    // no-op authentication
+    new BasicPrincipal("anonymous")
+  }
+}
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/authentication/BasicPrincipal.scala
 
b/common/src/main/scala/org/apache/celeborn/common/authentication/BasicPrincipal.scala
new file mode 100644
index 000000000..750677cf2
--- /dev/null
+++ 
b/common/src/main/scala/org/apache/celeborn/common/authentication/BasicPrincipal.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.authentication
+
+import java.security.Principal
+import java.util.Objects
+
+class BasicPrincipal(val name: String) extends Principal {
+  Objects.requireNonNull(name, "Principal name cannot be null")
+  override def getName: String = name
+
+  override def toString: String = name
+
+  override def hashCode(): Int = Objects.hash(name)
+
+  override def equals(o: Any): Boolean = {
+    if (this == o) {
+      true
+    } else if (o == null || getClass != o.getClass) {
+      false
+    } else {
+      Objects.equals(name, o.asInstanceOf[BasicPrincipal].name)
+    }
+  }
+}
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/authentication/HttpAuthSchemes.scala
 
b/common/src/main/scala/org/apache/celeborn/common/authentication/HttpAuthSchemes.scala
new file mode 100644
index 000000000..0a4bd479a
--- /dev/null
+++ 
b/common/src/main/scala/org/apache/celeborn/common/authentication/HttpAuthSchemes.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.authentication
+
+object HttpAuthSchemes extends Enumeration {
+  type HttpAuthScheme = Value
+  val NEGOTIATE, BASIC, BEARER = Value
+}
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/authentication/PasswdAuthenticationProvider.scala
 
b/common/src/main/scala/org/apache/celeborn/common/authentication/PasswdAuthenticationProvider.scala
new file mode 100644
index 000000000..71aaad40c
--- /dev/null
+++ 
b/common/src/main/scala/org/apache/celeborn/common/authentication/PasswdAuthenticationProvider.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.authentication
+
+import java.security.Principal
+
+trait PasswdAuthenticationProvider {
+
+  /**
+   * The authenticate method is called by the celeborn authentication layer
+   * to authenticate user & password for their requests.
+   * If a user is to be granted, return nothing/throw nothing.
+   * When a user is to be disallowed, throw an appropriate 
[[SecurityException]].
+   *
+   * @param user     The username received over the connection request
+   * @param password The password received over the connection request
+   *
+   * @throws SecurityException When a user is found to be invalid by the 
implementation
+   */
+  @throws[SecurityException]
+  def authenticate(user: String, password: String): Principal
+}
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/authentication/TokenAuthenticationProvider.scala
 
b/common/src/main/scala/org/apache/celeborn/common/authentication/TokenAuthenticationProvider.scala
new file mode 100644
index 000000000..81729f26c
--- /dev/null
+++ 
b/common/src/main/scala/org/apache/celeborn/common/authentication/TokenAuthenticationProvider.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.authentication
+
+import java.security.Principal
+
+trait TokenAuthenticationProvider {
+
+  /**
+   * The authenticate method is called by the celeborn authentication layer
+   * to authenticate token for their requests.
+   * If the token is to be granted, return nothing/throw nothing.
+   * When the token is to be disallowed, throw an appropriate 
[[SecurityException]].
+   *
+   * @param token The token received over the connection request.
+   * @return The identifier associated with the token
+   *
+   * @throws SecurityException When the token is found to be invalid by the 
implementation
+   */
+  @throws[SecurityException]
+  def authenticate(token: String): Principal
+}
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index 97aae4333..efa455451 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -43,10 +43,16 @@ license: |
 | celeborn.master.heartbeat.application.timeout | 300s | false | Application 
heartbeat timeout. | 0.3.0 | celeborn.application.heartbeat.timeout | 
 | celeborn.master.heartbeat.worker.timeout | 120s | false | Worker heartbeat 
timeout. | 0.3.0 | celeborn.worker.heartbeat.timeout | 
 | celeborn.master.host | &lt;localhost&gt; | false | Hostname for master to 
bind. | 0.2.0 |  | 
+| celeborn.master.http.auth.basic.provider | 
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl | 
false | User-defined password authentication implementation of 
org.apache.celeborn.common.authentication.PasswdAuthenticationProvider | 0.6.0 
|  | 
+| celeborn.master.http.auth.bearer.provider | 
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl | 
false | User-defined token authentication implementation of 
org.apache.celeborn.common.authentication.TokenAuthenticationProvider | 0.6.0 | 
 | 
+| celeborn.master.http.auth.supportedSchemes |  | false | A comma-separated 
list of master http auth supported schemes.<ul> <li>SPNEGO: Kerberos/GSSAPI 
authentication.</li> <li>BASIC: User-defined password authentication, the 
concreted implementation is configurable via 
`celeborn.master.http.auth.basic.provider`.</li> <li>BEARER: User-defined 
bearer token authentication, the concreted implementation is configurable via 
`celeborn.master.http.auth.bearer.provider`.</li></ul> | 0.6.0 |  | 
 | celeborn.master.http.host | &lt;localhost&gt; | false | Master's http host. 
| 0.4.0 | 
celeborn.metrics.master.prometheus.host,celeborn.master.metrics.prometheus.host 
| 
 | celeborn.master.http.idleTimeout | 30s | false | Master http server idle 
timeout. | 0.5.0 |  | 
 | celeborn.master.http.maxWorkerThreads | 200 | false | Maximum number of 
threads in the master http worker thread pool. | 0.5.0 |  | 
 | celeborn.master.http.port | 9098 | false | Master's http port. | 0.4.0 | 
celeborn.metrics.master.prometheus.port,celeborn.master.metrics.prometheus.port 
| 
+| celeborn.master.http.proxy.client.ip.header | X-Real-IP | false | The HTTP 
header to record the real client IP address. If your server is behind a load 
balancer or other proxy, the server will see this load balancer or proxy IP 
address as the client IP address, to get around this common issue, most load 
balancers or proxies offer the ability to record the real remote IP address in 
an HTTP header that will be added to the request for other devices to use. Note 
that, because the header v [...]
+| celeborn.master.http.spnego.keytab | &lt;undefined&gt; | false | The keytab 
file for SPNego authentication. | 0.6.0 |  | 
+| celeborn.master.http.spnego.principal | &lt;undefined&gt; | false | SPNego 
service principal, typical value would look like HTTP/[email protected]. SPNego 
service principal would be used when celeborn http authentication is enabled. 
This needs to be set only if SPNEGO is to be used in authentication. | 0.6.0 |  
| 
 | celeborn.master.http.stopTimeout | 5s | false | Master http server stop 
timeout. | 0.5.0 |  | 
 | celeborn.master.internal.port | 8097 | false | Internal port on the master 
where both workers and other master nodes connect. | 0.5.0 |  | 
 | celeborn.master.port | 9097 | false | Port for master to bind. | 0.2.0 |  | 
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 30d27f780..ef15cd7d0 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -86,10 +86,16 @@ license: |
 | celeborn.worker.graceful.shutdown.saveCommittedFileInfo.interval | 5s | 
false | Interval for a Celeborn worker to flush committed file infos into Level 
DB. | 0.3.1 |  | 
 | celeborn.worker.graceful.shutdown.saveCommittedFileInfo.sync | false | false 
| Whether to call sync method to save committed file infos into Level DB to 
handle OS crash. | 0.3.1 |  | 
 | celeborn.worker.graceful.shutdown.timeout | 600s | false | The worker's 
graceful shutdown timeout time. | 0.2.0 |  | 
+| celeborn.worker.http.auth.basic.provider | 
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl | 
false | User-defined password authentication implementation of 
org.apache.celeborn.common.authentication.PasswdAuthenticationProvider | 0.6.0 
|  | 
+| celeborn.worker.http.auth.bearer.provider | 
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl | 
false | User-defined token authentication implementation of 
org.apache.celeborn.common.authentication.TokenAuthenticationProvider | 0.6.0 | 
 | 
+| celeborn.worker.http.auth.supportedSchemes |  | false | A comma-separated 
list of worker http auth supported schemes.<ul> <li>SPNEGO: Kerberos/GSSAPI 
authentication.</li> <li>BASIC: User-defined password authentication, the 
concreted implementation is configurable via 
`celeborn.worker.http.auth.basic.provider`.</li> <li>BEARER: User-defined 
bearer token authentication, the concreted implementation is configurable via 
`celeborn.worker.http.auth.bearer.provider`.</li></ul> | 0.6.0 |  | 
 | celeborn.worker.http.host | &lt;localhost&gt; | false | Worker's http host. 
| 0.4.0 | 
celeborn.metrics.worker.prometheus.host,celeborn.worker.metrics.prometheus.host 
| 
 | celeborn.worker.http.idleTimeout | 30s | false | Worker http server idle 
timeout. | 0.5.0 |  | 
 | celeborn.worker.http.maxWorkerThreads | 200 | false | Maximum number of 
threads in the worker http worker thread pool. | 0.5.0 |  | 
 | celeborn.worker.http.port | 9096 | false | Worker's http port. | 0.4.0 | 
celeborn.metrics.worker.prometheus.port,celeborn.worker.metrics.prometheus.port 
| 
+| celeborn.worker.http.proxy.client.ip.header | X-Real-IP | false | The HTTP 
header to record the real client IP address. If your server is behind a load 
balancer or other proxy, the server will see this load balancer or proxy IP 
address as the client IP address, to get around this common issue, most load 
balancers or proxies offer the ability to record the real remote IP address in 
an HTTP header that will be added to the request for other devices to use. Note 
that, because the header v [...]
+| celeborn.worker.http.spnego.keytab | &lt;undefined&gt; | false | The keytab 
file for SPNego authentication. | 0.6.0 |  | 
+| celeborn.worker.http.spnego.principal | &lt;undefined&gt; | false | SPNego 
service principal, typical value would look like HTTP/[email protected]. SPNego 
service principal would be used when celeborn http authentication is enabled. 
This needs to be set only if SPNEGO is to be used in authentication. | 0.6.0 |  
| 
 | celeborn.worker.http.stopTimeout | 5s | false | Worker http server stop 
timeout. | 0.5.0 |  | 
 | celeborn.worker.internal.port | 0 | false | Internal server port on the 
Worker where the master nodes connect. | 0.5.0 |  | 
 | celeborn.worker.jvmProfiler.enabled | false | false | Turn on code profiling 
via async_profiler in workers. | 0.5.0 |  | 
diff --git 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceAuthenticationSuite.scala
 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceAuthenticationSuite.scala
new file mode 100644
index 000000000..27907f5f4
--- /dev/null
+++ 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceAuthenticationSuite.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.master
+
+import com.google.common.io.Files
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
+import org.apache.celeborn.server.common.HttpService
+import 
org.apache.celeborn.server.common.http.ApiBaseResourceAuthenticationSuite
+
+class ApiMasterResourceAuthenticationSuite extends 
ApiBaseResourceAuthenticationSuite {
+  private var master: Master = _
+
+  override protected def httpService: HttpService = master
+
+  def getTmpDir(): String = {
+    val tmpDir = Files.createTempDir()
+    tmpDir.deleteOnExit()
+    tmpDir.getAbsolutePath
+  }
+
+  override def beforeAll(): Unit = {
+    val randomMasterPort = Utils.selectRandomPort(1024, 65535)
+    val randomHttpPort = randomMasterPort + 1
+    celebornConf.set(CelebornConf.HA_ENABLED.key, "false")
+    celebornConf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir())
+    celebornConf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir())
+    celebornConf.set(CelebornConf.MASTER_HTTP_HOST.key, "127.0.0.1")
+    celebornConf.set(CelebornConf.MASTER_HTTP_PORT.key, 
randomHttpPort.toString)
+
+    val args = Array("-h", "localhost", "-p", randomMasterPort.toString)
+
+    val masterArgs = new MasterArguments(args, celebornConf)
+    master = new Master(celebornConf, masterArgs)
+    new Thread() {
+      override def run(): Unit = {
+        master.initialize()
+      }
+    }.start()
+    super.beforeAll()
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
+    master.rpcEnv.shutdown()
+  }
+}
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala 
b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
index 6779d1249..87a621320 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
@@ -18,14 +18,18 @@
 package org.apache.celeborn.server.common
 
 import java.util
+import javax.servlet.DispatcherType
 
 import scala.collection.JavaConverters._
 
+import org.eclipse.jetty.servlet.FilterHolder
+
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.util.Utils
 import org.apache.celeborn.server.common.http.HttpServer
 import org.apache.celeborn.server.common.http.api.ApiRootResource
+import 
org.apache.celeborn.server.common.http.authentication.{AuthenticationFilter, 
HttpAuthenticationFactory}
 import org.apache.celeborn.server.common.service.config.ConfigLevel
 
 abstract class HttpService extends Service with Logging {
@@ -253,11 +257,16 @@ abstract class HttpService extends Service with Logging {
   }
 
   protected def startInternal(): Unit = {
-    httpServer.addHandler(ApiRootResource.getServletHandler(this))
+    val contextHandler = ApiRootResource.getServletHandler(this)
+    val holder = new FilterHolder(new AuthenticationFilter(conf, serviceName))
+    contextHandler.addFilter(holder, "/*", 
util.EnumSet.allOf(classOf[DispatcherType]))
+    
httpServer.addHandler(HttpAuthenticationFactory.wrapHandler(contextHandler))
+
     
httpServer.addStaticHandler("META-INF/resources/webjars/swagger-ui/4.9.1/", 
"/swagger-static/")
     httpServer.addStaticHandler("org/apache/celeborn/swagger", "/swagger")
     httpServer.addRedirectHandler("/help", "/swagger")
     httpServer.addRedirectHandler("/docs", "/swagger")
+
     if (metricsSystem.running) {
       metricsSystem.getServletContextHandlers.foreach { handler =>
         httpServer.addHandler(handler)
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpAuthUtils.scala
 
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpAuthUtils.scala
new file mode 100644
index 000000000..c475545c7
--- /dev/null
+++ 
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpAuthUtils.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http
+
+object HttpAuthUtils {
+  // HTTP header used by the server endpoint during an authentication sequence.
+  val WWW_AUTHENTICATE_HEADER = "WWW-Authenticate"
+  // HTTP header used by the client endpoint during an authentication sequence.
+  val AUTHORIZATION_HEADER = "Authorization"
+}
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/AuthenticationAuditLogger.scala
 
b/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/AuthenticationAuditLogger.scala
new file mode 100644
index 000000000..614719daf
--- /dev/null
+++ 
b/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/AuthenticationAuditLogger.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http.authentication
+
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+
+import org.apache.celeborn.common.internal.Logging
+import 
org.apache.celeborn.server.common.http.authentication.AuthenticationFilter._
+
+object AuthenticationAuditLogger extends Logging {
+  final private val AUDIT_BUFFER = new ThreadLocal[StringBuilder]() {
+    override protected def initialValue: StringBuilder = new StringBuilder()
+  }
+
+  def audit(request: HttpServletRequest, response: HttpServletResponse): Unit 
= {
+    val sb = AUDIT_BUFFER.get()
+    sb.setLength(0)
+    
sb.append(s"user=${HTTP_CLIENT_IDENTIFIER.get()}(auth:${HTTP_AUTH_TYPE.get()})").append("\t")
+    sb.append(s"ip=${HTTP_CLIENT_IP_ADDRESS.get()}").append("\t")
+    
sb.append(s"proxyIp=${HTTP_PROXY_HEADER_CLIENT_IP_ADDRESS.get()}").append("\t")
+    sb.append(s"method=${request.getMethod}").append("\t")
+    sb.append(s"uri=${request.getRequestURI}").append("\t")
+    sb.append(s"params=${request.getQueryString}").append("\t")
+    sb.append(s"protocol=${request.getProtocol}").append("\t")
+    sb.append(s"status=${response.getStatus}")
+    logInfo(sb.toString())
+  }
+}
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/AuthenticationFilter.scala
 
b/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/AuthenticationFilter.scala
new file mode 100644
index 000000000..650ad268e
--- /dev/null
+++ 
b/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/AuthenticationFilter.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http.authentication
+
+import java.io.IOException
+import javax.security.sasl.AuthenticationException
+import javax.servlet.{Filter, FilterChain, FilterConfig, ServletException, 
ServletRequest, ServletResponse}
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+
+import scala.collection.mutable
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.authentication.HttpAuthSchemes
+import 
org.apache.celeborn.common.authentication.HttpAuthSchemes.{HttpAuthScheme, _}
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.server.common.Service
+import 
org.apache.celeborn.server.common.http.HttpAuthUtils.AUTHORIZATION_HEADER
+
+class AuthenticationFilter(conf: CelebornConf, serviceName: String) extends 
Filter with Logging {
+  import AuthenticationFilter._
+
+  private[authentication] val authSchemeHandlers =
+    new mutable.HashMap[HttpAuthScheme, AuthenticationHandler]()
+
+  private[authentication] def addAuthHandler(authHandler: 
AuthenticationHandler): Unit = {
+    authHandler.init(conf)
+    if (authHandler.authenticationSupported) {
+      if (authSchemeHandlers.contains(authHandler.authScheme)) {
+        logWarning(s"Authentication handler has been defined for scheme 
${authHandler.authScheme}")
+      } else {
+        logInfo(s"Add authentication handler 
${authHandler.getClass.getSimpleName}" +
+          s" for scheme ${authHandler.authScheme}")
+        authSchemeHandlers.put(authHandler.authScheme, authHandler)
+      }
+    } else {
+      logWarning(s"The authentication handler 
${authHandler.getClass.getSimpleName}" +
+        s" for scheme ${authHandler.authScheme} is not supported")
+    }
+  }
+
+  private val authSchemes: Seq[HttpAuthScheme] = serviceName match {
+    case Service.MASTER =>
+      
conf.get(CelebornConf.MASTER_HTTP_AUTH_SUPPORTED_SCHEMES).map(HttpAuthSchemes.withName)
+    case Service.WORKER =>
+      
conf.get(CelebornConf.WORKER_HTTP_AUTH_SUPPORTED_SCHEMES).map(HttpAuthSchemes.withName)
+  }
+  private val proxyClientIpHeader: String = serviceName match {
+    case Service.MASTER => 
conf.get(CelebornConf.MASTER_HTTP_PROXY_CLIENT_IP_HEADER)
+    case Service.WORKER => 
conf.get(CelebornConf.WORKER_HTTP_PROXY_CLIENT_IP_HEADER)
+  }
+
+  private def initAuthHandlers(): Unit = {
+    if (authSchemes.contains(HttpAuthSchemes.NEGOTIATE)) {
+      serviceName match {
+        case Service.MASTER =>
+          addAuthHandler(new SpnegoAuthenticationHandler(
+            conf.get(CelebornConf.MASTER_HTTP_SPNEGO_KEYTAB).getOrElse(""),
+            conf.get(CelebornConf.MASTER_HTTP_SPNEGO_PRINCIPAL).getOrElse("")))
+        case Service.WORKER =>
+          addAuthHandler(new SpnegoAuthenticationHandler(
+            conf.get(CelebornConf.WORKER_HTTP_SPNEGO_KEYTAB).getOrElse(""),
+            conf.get(CelebornConf.WORKER_HTTP_SPNEGO_PRINCIPAL).getOrElse("")))
+      }
+    }
+    if (authSchemes.contains(HttpAuthSchemes.BASIC)) {
+      serviceName match {
+        case Service.MASTER =>
+          addAuthHandler(new BasicAuthenticationHandler(
+            conf.get(CelebornConf.MASTER_HTTP_AUTH_BASIC_PROVIDER)))
+        case Service.WORKER =>
+          addAuthHandler(new BasicAuthenticationHandler(
+            conf.get(CelebornConf.WORKER_HTTP_AUTH_BASIC_PROVIDER)))
+      }
+    }
+    if (authSchemes.contains(HttpAuthSchemes.BEARER)) {
+      serviceName match {
+        case Service.MASTER =>
+          addAuthHandler(new BearerAuthenticationHandler(
+            conf.get(CelebornConf.MASTER_HTTP_AUTH_BEARER_PROVIDER)))
+        case Service.WORKER =>
+          addAuthHandler(new BearerAuthenticationHandler(
+            conf.get(CelebornConf.WORKER_HTTP_AUTH_BEARER_PROVIDER)))
+      }
+    }
+  }
+
+  override def init(filterConfig: FilterConfig): Unit = {
+    initAuthHandlers()
+  }
+
+  private[celeborn] def getMatchedHandler(authorization: String): 
Option[AuthenticationHandler] = {
+    authSchemeHandlers.values.find(_.matchAuthScheme(authorization))
+  }
+
+  /**
+   * If the request has a valid authentication token it allows the request to 
continue to the
+   * target resource, otherwise it triggers an authentication sequence using 
the configured
+   * [[AuthenticationHandler]].
+   *
+   * @param request     the request object.
+   * @param response    the response object.
+   * @param filterChain the filter chain object.
+   * @throws IOException      thrown if an IO error occurred.
+   * @throws ServletException thrown if a processing error occurred.
+   */
+  override def doFilter(
+      request: ServletRequest,
+      response: ServletResponse,
+      filterChain: FilterChain): Unit = {
+    val httpRequest = request.asInstanceOf[HttpServletRequest]
+    val httpResponse = response.asInstanceOf[HttpServletResponse]
+
+    if (authSchemeHandlers.isEmpty || 
BYPASS_API_PATHS.contains(httpRequest.getRequestURI)) {
+      filterChain.doFilter(request, response)
+      return
+    }
+
+    val authorization = httpRequest.getHeader(AUTHORIZATION_HEADER)
+    val matchedHandler = getMatchedHandler(authorization).orNull
+    HTTP_CLIENT_IP_ADDRESS.set(httpRequest.getRemoteAddr)
+    
HTTP_PROXY_HEADER_CLIENT_IP_ADDRESS.set(httpRequest.getHeader(proxyClientIpHeader))
+
+    try {
+      if (matchedHandler == null) {
+        logDebug(s"No auth scheme matched for url: 
${httpRequest.getRequestURL}")
+        httpResponse.setStatus(HttpServletResponse.SC_UNAUTHORIZED)
+        httpResponse.sendError(
+          HttpServletResponse.SC_UNAUTHORIZED,
+          s"No auth scheme matched for $authorization")
+      } else {
+        HTTP_AUTH_TYPE.set(matchedHandler.authScheme.toString)
+        HTTP_CLIENT_IDENTIFIER.set(matchedHandler.authenticate(httpRequest, 
httpResponse))
+        doFilter(filterChain, httpRequest, httpResponse)
+      }
+    } catch {
+      case e: AuthenticationException =>
+        httpResponse.setStatus(HttpServletResponse.SC_FORBIDDEN)
+        HTTP_CLIENT_IDENTIFIER.remove()
+        HTTP_CLIENT_IP_ADDRESS.remove()
+        HTTP_PROXY_HEADER_CLIENT_IP_ADDRESS.remove()
+        HTTP_AUTH_TYPE.remove()
+        httpResponse.sendError(HttpServletResponse.SC_FORBIDDEN, e.getMessage)
+    } finally {
+      AuthenticationAuditLogger.audit(httpRequest, httpResponse)
+    }
+  }
+
+  /**
+   * Delegates call to the servlet filter chain. Sub-classes my override this
+   * method to perform pre and post tasks.
+   *
+   * @param filterChain the filter chain object.
+   * @param request     the request object.
+   * @param response    the response object.
+   * @throws IOException      thrown if an IO error occurred.
+   * @throws ServletException thrown if a processing error occurred.
+   */
+  @throws[IOException]
+  @throws[ServletException]
+  protected def doFilter(
+      filterChain: FilterChain,
+      request: HttpServletRequest,
+      response: HttpServletResponse): Unit = {
+    filterChain.doFilter(request, response)
+  }
+
+  override def destroy(): Unit = {
+    if (authSchemeHandlers.nonEmpty) {
+      authSchemeHandlers.values.foreach(_.destroy())
+      authSchemeHandlers.clear()
+    }
+  }
+}
+
+object AuthenticationFilter {
+  private val BYPASS_API_PATHS = Set("/openapi.json", "/openapi.yaml")
+
+  final val HTTP_CLIENT_IP_ADDRESS = new ThreadLocal[String]() {
+    override protected def initialValue: String = null
+  }
+  final val HTTP_PROXY_HEADER_CLIENT_IP_ADDRESS = new ThreadLocal[String]() {
+    override protected def initialValue: String = null
+  }
+  final val HTTP_CLIENT_IDENTIFIER = new ThreadLocal[String]() {
+    override protected def initialValue: String = null
+  }
+  final val HTTP_AUTH_TYPE = new ThreadLocal[String]() {
+    override protected def initialValue(): String = null
+  }
+}
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/AuthenticationHandler.scala
 
b/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/AuthenticationHandler.scala
new file mode 100644
index 000000000..8e4076536
--- /dev/null
+++ 
b/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/AuthenticationHandler.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http.authentication
+
+import javax.security.sasl.AuthenticationException
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.authentication.HttpAuthSchemes.HttpAuthScheme
+import 
org.apache.celeborn.server.common.http.HttpAuthUtils.AUTHORIZATION_HEADER
+
+trait AuthenticationHandler {
+
+  /**
+   * HTTP header prefix used during the authentication sequence.
+   */
+  val authScheme: HttpAuthScheme
+
+  /**
+   *  Initializes the authentication handler instance.
+   */
+  def init(conf: CelebornConf): Unit
+
+  /**
+   * Whether this authentication handler is configured well and support 
authentication.
+   */
+  def authenticationSupported: Boolean
+
+  /**
+   * Destroys the authentication handler instance.
+   * <p>
+   * This method is invoked by the [[AuthenticationFilter.destroy]] method.
+   */
+  def destroy(): Unit
+
+  /**
+   * Performs an authentication step for the given HTTP client request.
+   * <p>
+   * This method is invoked by the [[AuthenticationFilter]] only if the HTTP 
client request is
+   * not yet authenticated.
+   * <p>
+   * Depending upon the authentication mechanism being implemented, a 
particular HTTP client may
+   * end up making a sequence of invocations before authentication is 
successfully established
+   * (this is the case of Kerberos SPNEGO).
+   * <p>
+   * This method must return a user name only if the the HTTP client request 
has been successfully
+   * and fully authenticated.
+   * <p>
+   * If the HTTP client request has not been completely authenticated, this 
method must take over
+   * the corresponding HTTP response and it must return <code>null</code>.
+   *
+   * @param request  the HTTP client request.
+   * @param response the HTTP client response.
+   * @return the user name
+   * @throws AuthenticationException thrown if an Authentication error 
occurred.
+   */
+  def authenticate(request: HttpServletRequest, response: 
HttpServletResponse): String
+
+  /**
+   * This method checks if the specified <code>authorization</code> belongs to 
the
+   * auth schema of current authentication handler.
+   * @param authorization Authentication header value which is to be compared 
with the
+   *                      authentication scheme.
+   */
+  def matchAuthScheme(authorization: String): Boolean = {
+    if (authorization == null || authorization.isEmpty) {
+      false
+    } else {
+      authorization.trim.regionMatches(true, 0, authScheme.toString, 0, 
authScheme.toString.length)
+    }
+  }
+
+  /**
+   * Get the decoded authorization value after auth scheme in Authentication 
header.
+   */
+  def getAuthorization(request: HttpServletRequest): String = {
+    val authHeader = request.getHeader(AUTHORIZATION_HEADER)
+    // each http request must have an Authorization header
+    if (authHeader == null || authHeader.isEmpty) {
+      throw new AuthenticationException("Authorization header received from 
the client is empty.")
+    }
+
+    var authorization = authHeader.substring(authScheme.toString.length).trim
+    // For thrift http spnego authorization, its format is 'NEGOTIATE : 
$token', see HIVE-26353
+    if (authorization.startsWith(":")) {
+      authorization = authorization.stripPrefix(":").trim
+    }
+    // Authorization header must have a payload
+    if (authorization == null || authorization.isEmpty) {
+      throw new AuthenticationException(
+        "Authorization header received from the client does not contain any 
data.")
+    }
+    authorization
+  }
+}
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/BasicAuthenticationHandler.scala
 
b/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/BasicAuthenticationHandler.scala
new file mode 100644
index 000000000..72350443b
--- /dev/null
+++ 
b/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/BasicAuthenticationHandler.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http.authentication
+
+import java.nio.charset.StandardCharsets
+import java.util.Base64
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+
+import org.apache.celeborn.common.CelebornConf
+import 
org.apache.celeborn.common.authentication.{AnonymousAuthenticationProviderImpl, 
PasswdAuthenticationProvider}
+import org.apache.celeborn.common.authentication.HttpAuthSchemes._
+import org.apache.celeborn.common.internal.Logging
+import 
org.apache.celeborn.server.common.http.HttpAuthUtils.{AUTHORIZATION_HEADER, 
WWW_AUTHENTICATE_HEADER}
+
+class BasicAuthenticationHandler(providerClass: String) extends 
AuthenticationHandler with Logging {
+
+  private var conf: CelebornConf = _
+
+  private val allowAnonymous = 
classOf[AnonymousAuthenticationProviderImpl].getName == providerClass
+  override val authScheme: HttpAuthScheme = BASIC
+
+  override def init(conf: CelebornConf): Unit = {
+    this.conf = conf
+  }
+
+  override def authenticationSupported: Boolean = {
+    Option(providerClass).exists { _ =>
+      try {
+        
Class.forName(providerClass).isAssignableFrom(classOf[PasswdAuthenticationProvider])
+        true
+      } catch {
+        case _: Throwable => false
+      }
+    }
+  }
+
+  override def matchAuthScheme(authorization: String): Boolean = {
+    if (authorization == null || authorization.isEmpty) {
+      allowAnonymous
+    } else {
+      super.matchAuthScheme(authorization)
+    }
+  }
+
+  override def getAuthorization(request: HttpServletRequest): String = {
+    val authHeader = request.getHeader(AUTHORIZATION_HEADER)
+    if (allowAnonymous && (authHeader == null || authHeader.isEmpty)) {
+      ""
+    } else {
+      super.getAuthorization(request)
+    }
+  }
+
+  override def authenticate(
+      request: HttpServletRequest,
+      response: HttpServletResponse): String = {
+    var authUser: String = null
+
+    val authorization = getAuthorization(request)
+    val inputToken = Option(authorization).map(a => 
Base64.getDecoder.decode(a.getBytes()))
+      .getOrElse(Array.empty[Byte])
+    val creds = new String(inputToken, StandardCharsets.UTF_8).split(":")
+
+    if (allowAnonymous) {
+      authUser = 
creds.take(1).headOption.filterNot(_.isEmpty).getOrElse("anonymous")
+    } else {
+      if (creds.size < 2 || creds(0).trim.isEmpty || creds(1).trim.isEmpty) {
+        response.setHeader(WWW_AUTHENTICATE_HEADER, authScheme.toString)
+        response.setStatus(HttpServletResponse.SC_UNAUTHORIZED)
+      } else {
+        val Seq(user, password) = creds.toSeq.take(2)
+        val passwdAuthenticationProvider = HttpAuthenticationFactory
+          .getPasswordAuthenticationProvider(providerClass, conf)
+        authUser = passwdAuthenticationProvider.authenticate(user, 
password).getName
+        response.setStatus(HttpServletResponse.SC_OK)
+      }
+    }
+    authUser
+  }
+
+  override def destroy(): Unit = {}
+}
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/BearerAuthenticationHandler.scala
 
b/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/BearerAuthenticationHandler.scala
new file mode 100644
index 000000000..63ae38c9d
--- /dev/null
+++ 
b/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/BearerAuthenticationHandler.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http.authentication
+
+import java.nio.charset.StandardCharsets
+import java.util.Base64
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+
+import org.apache.celeborn.common.CelebornConf
+import 
org.apache.celeborn.common.authentication.{AnonymousAuthenticationProviderImpl, 
TokenAuthenticationProvider}
+import org.apache.celeborn.common.authentication.HttpAuthSchemes._
+import org.apache.celeborn.common.internal.Logging
+import 
org.apache.celeborn.server.common.http.HttpAuthUtils.{AUTHORIZATION_HEADER, 
WWW_AUTHENTICATE_HEADER}
+
+class BearerAuthenticationHandler(providerClass: String)
+  extends AuthenticationHandler with Logging {
+  private var conf: CelebornConf = _
+  private val allowAnonymous = 
classOf[AnonymousAuthenticationProviderImpl].getName == providerClass
+  override val authScheme: HttpAuthScheme = BEARER
+
+  override def init(conf: CelebornConf): Unit = {
+    this.conf = conf
+  }
+
+  override def authenticationSupported: Boolean = {
+    Option(providerClass).exists { _ =>
+      try {
+        
Class.forName(providerClass).isAssignableFrom(classOf[TokenAuthenticationProvider])
+        true
+      } catch {
+        case _: Throwable => false
+      }
+    }
+  }
+
+  override def matchAuthScheme(authorization: String): Boolean = {
+    if (authorization == null || authorization.isEmpty) {
+      allowAnonymous
+    } else {
+      super.matchAuthScheme(authorization)
+    }
+  }
+
+  override def getAuthorization(request: HttpServletRequest): String = {
+    val authHeader = request.getHeader(AUTHORIZATION_HEADER)
+    if (allowAnonymous && (authHeader == null || authHeader.isEmpty)) {
+      ""
+    } else {
+      super.getAuthorization(request)
+    }
+  }
+
+  override def authenticate(
+      request: HttpServletRequest,
+      response: HttpServletResponse): String = {
+    var principal: String = null
+    val inputToken = Option(getAuthorization(request))
+      .map(a => Base64.getDecoder.decode(a.getBytes()))
+      .getOrElse(Array.empty[Byte])
+
+    if (!allowAnonymous && inputToken.isEmpty) {
+      response.setHeader(WWW_AUTHENTICATE_HEADER, authScheme.toString)
+      response.setStatus(HttpServletResponse.SC_UNAUTHORIZED)
+    } else {
+      principal = HttpAuthenticationFactory
+        .getTokenAuthenticationProvider(providerClass, conf)
+        .authenticate(new String(inputToken, StandardCharsets.UTF_8)).getName
+      response.setStatus(HttpServletResponse.SC_OK)
+    }
+    principal
+  }
+
+  override def destroy(): Unit = {}
+}
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/HttpAuthenticationFactory.scala
 
b/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/HttpAuthenticationFactory.scala
new file mode 100644
index 000000000..aa7e05d0f
--- /dev/null
+++ 
b/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/HttpAuthenticationFactory.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http.authentication
+
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+
+import org.eclipse.jetty.server.{Handler, Request}
+import org.eclipse.jetty.server.handler.HandlerWrapper
+
+import org.apache.celeborn.common.CelebornConf
+import 
org.apache.celeborn.common.authentication.{PasswdAuthenticationProvider, 
TokenAuthenticationProvider}
+import org.apache.celeborn.common.exception.CelebornException
+import org.apache.celeborn.reflect.DynConstructors
+
+object HttpAuthenticationFactory {
+  def wrapHandler(handler: Handler): HandlerWrapper = {
+    new HandlerWrapper {
+      _handler = handler
+
+      override def handle(
+          target: String,
+          baseRequest: Request,
+          request: HttpServletRequest,
+          response: HttpServletResponse): Unit = {
+
+        try {
+          handler.handle(target, baseRequest, request, response)
+        } finally {
+          AuthenticationFilter.HTTP_CLIENT_IDENTIFIER.remove()
+          AuthenticationFilter.HTTP_CLIENT_IP_ADDRESS.remove()
+          AuthenticationFilter.HTTP_PROXY_HEADER_CLIENT_IP_ADDRESS.remove()
+          AuthenticationFilter.HTTP_AUTH_TYPE.remove()
+        }
+      }
+
+      override def doStart(): Unit = {
+        super.doStart()
+        handler.start()
+      }
+
+      override def doStop(): Unit = {
+        handler.stop()
+        super.doStop()
+      }
+    }
+  }
+
+  def getPasswordAuthenticationProvider(
+      providerClass: String,
+      conf: CelebornConf): PasswdAuthenticationProvider = {
+    createAuthenticationProvider(providerClass, 
classOf[PasswdAuthenticationProvider], conf)
+  }
+
+  def getTokenAuthenticationProvider(
+      providerClass: String,
+      conf: CelebornConf): TokenAuthenticationProvider = {
+    createAuthenticationProvider(providerClass, 
classOf[TokenAuthenticationProvider], conf)
+  }
+
+  private def createAuthenticationProvider[T](
+      className: String,
+      expected: Class[T],
+      conf: CelebornConf): T = {
+    try {
+      DynConstructors.builder(expected)
+        .impl(className, classOf[CelebornConf])
+        .impl(className)
+        .buildChecked[T]()
+        .newInstance(conf)
+    } catch {
+      case e: Exception =>
+        throw new CelebornException(
+          s"$className must extend of ${expected.getName}",
+          e)
+    }
+  }
+}
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/SpnegoAuthenticationHandler.scala
 
b/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/SpnegoAuthenticationHandler.scala
new file mode 100644
index 000000000..3862f62b2
--- /dev/null
+++ 
b/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/SpnegoAuthenticationHandler.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http.authentication
+
+import java.io.{File, IOException}
+import java.security.{PrivilegedActionException, PrivilegedExceptionAction}
+import java.util.Base64
+import javax.security.auth.Subject
+import javax.security.auth.kerberos.{KerberosPrincipal, KeyTab}
+import javax.security.sasl.AuthenticationException
+import javax.servlet.ServletException
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+
+import org.apache.hadoop.security.authentication.util.KerberosName
+import 
org.apache.hadoop.security.authentication.util.KerberosUtil.{getTokenServerName,
 GSS_KRB5_MECH_OID, GSS_SPNEGO_MECH_OID, NT_GSS_KRB5_PRINCIPAL_OID}
+import org.ietf.jgss.{GSSContext, GSSCredential, GSSManager, Oid}
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.authentication.HttpAuthSchemes
+import org.apache.celeborn.common.authentication.HttpAuthSchemes._
+import org.apache.celeborn.common.internal.Logging
+import 
org.apache.celeborn.server.common.http.HttpAuthUtils.WWW_AUTHENTICATE_HEADER
+
+class SpnegoAuthenticationHandler(keytab: String, principal: String) extends 
AuthenticationHandler
+  with Logging {
+  private var gssManager: GSSManager = _
+  private var conf: CelebornConf = _
+  private var serverSubject = new Subject()
+
+  override val authScheme: HttpAuthScheme = HttpAuthSchemes.NEGOTIATE
+
+  override def authenticationSupported: Boolean = {
+    keytab.nonEmpty && principal.nonEmpty
+  }
+
+  override def init(conf: CelebornConf): Unit = {
+    this.conf = conf
+    if (authenticationSupported) {
+      val keytabFile = new File(keytab)
+      if (!keytabFile.exists()) {
+        throw new ServletException(s"Keytab[$keytab] does not exists")
+      }
+      if (!principal.startsWith("HTTP/")) {
+        throw new ServletException(s"SPNEGO principal[$principal] does not 
start with HTTP/")
+      }
+
+      logInfo(s"Using keytab $keytab, for principal $principal")
+      serverSubject.getPrivateCredentials().add(KeyTab.getInstance(keytabFile))
+      serverSubject.getPrincipals.add(new KerberosPrincipal(principal))
+
+      // TODO: support to config kerberos.name.rules and 
kerberos.rule.mechanism
+      // set default rules if no rules set, otherwise it will throw exception
+      // when parse the kerberos name
+      if (!KerberosName.hasRulesBeenSet) {
+        KerberosName.setRules("DEFAULT")
+      }
+
+      try {
+        gssManager = Subject.doAs(
+          serverSubject,
+          new PrivilegedExceptionAction[GSSManager] {
+            override def run(): GSSManager = {
+              GSSManager.getInstance()
+            }
+          })
+      } catch {
+        case e: PrivilegedActionException => throw e.getException
+        case e: Exception => throw new ServletException(e)
+      }
+    }
+  }
+
+  override def destroy(): Unit = {
+    serverSubject = null
+  }
+
+  override def authenticate(
+      request: HttpServletRequest,
+      response: HttpServletResponse): String = {
+    var authUser: String = null
+    val authorization = getAuthorization(request)
+    val clientToken = Base64.getDecoder.decode(authorization)
+    try {
+      val serverPrincipal = getTokenServerName(clientToken)
+      if (!serverPrincipal.startsWith("HTTP/")) {
+        throw new IllegalArgumentException(
+          s"Invalid server principal $serverPrincipal decoded from client 
request")
+      }
+      authUser = Subject.doAs(
+        serverSubject,
+        new PrivilegedExceptionAction[String] {
+          override def run(): String = {
+            runWithPrincipal(serverPrincipal, clientToken, response)
+          }
+        })
+    } catch {
+      case ex: PrivilegedActionException =>
+        ex.getException match {
+          case ioe: IOException =>
+            throw ioe
+          case e: Exception => throw new AuthenticationException("SPNEGO 
authentication failed", e)
+        }
+
+      case e: Exception => throw new AuthenticationException("SPNEGO 
authentication failed", e)
+    }
+    authUser
+  }
+
+  def runWithPrincipal(
+      serverPrincipal: String,
+      clientToken: Array[Byte],
+      response: HttpServletResponse): String = {
+    var gssContext: GSSContext = null
+    var gssCreds: GSSCredential = null
+    var authUser: String = null
+    try {
+      logDebug(s"SPNEGO initialized with server principal $serverPrincipal")
+      gssCreds = gssManager.createCredential(
+        gssManager.createName(serverPrincipal, NT_GSS_KRB5_PRINCIPAL_OID),
+        GSSCredential.INDEFINITE_LIFETIME,
+        Array[Oid](GSS_SPNEGO_MECH_OID, GSS_KRB5_MECH_OID),
+        GSSCredential.ACCEPT_ONLY)
+      gssContext = gssManager.createContext(gssCreds)
+      val serverToken = gssContext.acceptSecContext(clientToken, 0, 
clientToken.length)
+      if (serverToken != null && serverToken.nonEmpty) {
+        val authenticate = Base64.getEncoder.encodeToString(serverToken)
+        response.setHeader(WWW_AUTHENTICATE_HEADER, s"$NEGOTIATE 
$authenticate")
+      }
+      if (!gssContext.isEstablished) {
+        response.setStatus(HttpServletResponse.SC_UNAUTHORIZED)
+        logDebug("SPNEGO in progress")
+      } else {
+        val clientPrincipal = gssContext.getSrcName.toString
+        val kerberosName = new KerberosName(clientPrincipal)
+        val userName = kerberosName.getShortName
+        authUser = userName
+        response.setStatus(HttpServletResponse.SC_OK)
+        logDebug(s"SPNEGO completed for client principal $clientPrincipal")
+      }
+    } finally {
+      if (gssContext != null) {
+        gssContext.dispose()
+      }
+      if (gssCreds != null) {
+        gssCreds.dispose()
+      }
+    }
+    authUser
+  }
+}
diff --git 
a/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceAuthenticationSuite.scala
 
b/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceAuthenticationSuite.scala
new file mode 100644
index 000000000..8b7921777
--- /dev/null
+++ 
b/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceAuthenticationSuite.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http
+
+import java.nio.charset.StandardCharsets
+import java.util.Base64
+import javax.servlet.http.HttpServletResponse
+import javax.ws.rs.core.MediaType
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.authentication.HttpAuthSchemes
+import 
org.apache.celeborn.server.common.http.HttpAuthUtils.AUTHORIZATION_HEADER
+import 
org.apache.celeborn.server.common.http.authentication.{UserDefinePasswordAuthenticationProviderImpl,
 UserDefineTokenAuthenticationProviderImpl}
+
+abstract class ApiBaseResourceAuthenticationSuite extends HttpTestHelper {
+  celebornConf
+    .set(CelebornConf.METRICS_ENABLED.key, "true")
+    .set(
+      CelebornConf.METRICS_CONF.key,
+      
Thread.currentThread().getContextClassLoader.getResource("metrics-api.properties").getFile)
+    .set(CelebornConf.MASTER_HTTP_AUTH_SUPPORTED_SCHEMES, Seq("BASIC", 
"BEARER"))
+    .set(
+      CelebornConf.MASTER_HTTP_AUTH_BASIC_PROVIDER,
+      classOf[UserDefinePasswordAuthenticationProviderImpl].getName)
+    .set(
+      CelebornConf.MASTER_HTTP_AUTH_BEARER_PROVIDER,
+      classOf[UserDefineTokenAuthenticationProviderImpl].getName)
+    .set(CelebornConf.WORKER_HTTP_AUTH_SUPPORTED_SCHEMES, Seq("BASIC", 
"BEARER"))
+    .set(
+      CelebornConf.WORKER_HTTP_AUTH_BASIC_PROVIDER,
+      classOf[UserDefinePasswordAuthenticationProviderImpl].getName)
+    .set(
+      CelebornConf.WORKER_HTTP_AUTH_BEARER_PROVIDER,
+      classOf[UserDefineTokenAuthenticationProviderImpl].getName)
+
+  def basicAuthorizationHeader(user: String, password: String): String =
+    HttpAuthSchemes.BASIC + " " + new String(
+      Base64.getEncoder.encode(s"$user:$password".getBytes()),
+      StandardCharsets.UTF_8)
+
+  def bearerAuthorizationHeader(token: String): String =
+    HttpAuthSchemes.BEARER + " " + new String(
+      Base64.getEncoder.encode(token.getBytes()),
+      StandardCharsets.UTF_8)
+
+  Seq("conf", "listDynamicConfigs", "workerInfo", "shuffle", 
"applications").foreach { api =>
+    test(s"API $api authentication") {
+      var response = webTarget.path(api)
+        .request(MediaType.TEXT_PLAIN)
+        .header(
+          AUTHORIZATION_HEADER,
+          basicAuthorizationHeader(
+            "user",
+            UserDefinePasswordAuthenticationProviderImpl.VALID_PASSWORD))
+        .get()
+      assert(HttpServletResponse.SC_OK == response.getStatus)
+
+      response = webTarget.path(api)
+        .request(MediaType.TEXT_PLAIN)
+        .header(AUTHORIZATION_HEADER, basicAuthorizationHeader("user", 
"invalid"))
+        .get()
+      assert(HttpServletResponse.SC_FORBIDDEN == response.getStatus)
+
+      response = webTarget.path(api)
+        .request(MediaType.TEXT_PLAIN)
+        .header(
+          AUTHORIZATION_HEADER,
+          
bearerAuthorizationHeader(UserDefineTokenAuthenticationProviderImpl.VALID_TOKEN))
+        .get()
+      assert(HttpServletResponse.SC_OK == response.getStatus)
+
+      response = webTarget.path(api)
+        .request(MediaType.TEXT_PLAIN)
+        .header(AUTHORIZATION_HEADER, bearerAuthorizationHeader("bad_token"))
+        .get()
+      assert(HttpServletResponse.SC_FORBIDDEN == response.getStatus)
+    }
+  }
+
+  test("swagger api do not need authentication") {
+    Seq("swagger", "docs", "help").foreach { path =>
+      val response = webTarget.path(path).request(MediaType.TEXT_HTML).get()
+      assert(HttpServletResponse.SC_OK == response.getStatus)
+      assert(response.readEntity(classOf[String]).contains("swagger-ui"))
+    }
+    Seq(
+      "openapi.json" -> MediaType.APPLICATION_JSON,
+      "openapi.yaml" -> "application/yaml").foreach { case (path, mediaType) =>
+      val response = webTarget.path(path).request(mediaType).get()
+      assert(HttpServletResponse.SC_OK == response.getStatus)
+      assert(response.readEntity(classOf[String]).contains("/conf"))
+    }
+  }
+
+  test("metrics api do not need authentication") {
+    var response = 
webTarget.path("metrics/prometheus").request(MediaType.APPLICATION_JSON).get()
+    assert(200 == response.getStatus)
+    
assert(response.readEntity(classOf[String]).contains("metrics_jvm_memory_heap_max_Value"))
+
+    response = 
webTarget.path("metrics/json").request(MediaType.APPLICATION_JSON).get()
+    assert(200 == response.getStatus)
+    assert(response.readEntity(classOf[String]).contains("\"name\" : 
\"jvm.memory.heap.max\""))
+  }
+}
diff --git 
a/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceSuite.scala
 
b/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceSuite.scala
index 632d06c79..28e1ed9ed 100644
--- 
a/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceSuite.scala
+++ 
b/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.celeborn.server.common.http
 
+import javax.servlet.http.HttpServletResponse
 import javax.ws.rs.core.MediaType
 
 import org.apache.celeborn.common.CelebornConf
@@ -29,13 +30,13 @@ abstract class ApiBaseResourceSuite extends HttpTestHelper {
 
   test("ping") {
     val response = webTarget.path("ping").request(MediaType.TEXT_PLAIN).get()
-    assert(200 == response.getStatus)
+    assert(HttpServletResponse.SC_OK == response.getStatus)
     assert(response.readEntity(classOf[String]) == "pong")
   }
 
   test("conf") {
     val response = webTarget.path("conf").request(MediaType.TEXT_PLAIN).get()
-    assert(200 == response.getStatus)
+    assert(HttpServletResponse.SC_OK == response.getStatus)
   }
 
   test("listDynamicConfigs") {
@@ -43,57 +44,56 @@ abstract class ApiBaseResourceSuite extends HttpTestHelper {
       .queryParam("LEVEL", "TENANT")
       .request(MediaType.TEXT_PLAIN)
       .get()
-    assert(200 == response.getStatus)
+    assert(HttpServletResponse.SC_OK == response.getStatus)
   }
 
   test("workerInfo") {
     val response = 
webTarget.path("workerInfo").request(MediaType.TEXT_PLAIN).get()
-    assert(200 == response.getStatus)
+    assert(HttpServletResponse.SC_OK == response.getStatus)
   }
 
   test("threadDump") {
     val response = 
webTarget.path("threadDump").request(MediaType.TEXT_PLAIN).get()
-    assert(200 == response.getStatus)
+    assert(HttpServletResponse.SC_OK == response.getStatus)
   }
 
   test("shuffle") {
     val response = 
webTarget.path("shuffle").request(MediaType.TEXT_PLAIN).get()
-    assert(200 == response.getStatus)
+    assert(HttpServletResponse.SC_OK == response.getStatus)
   }
 
   test("applications") {
     val response = 
webTarget.path("applications").request(MediaType.TEXT_PLAIN).get()
-    assert(200 == response.getStatus)
+    assert(HttpServletResponse.SC_OK == response.getStatus)
   }
 
   test("listTopDiskUsedApps") {
     val response = 
webTarget.path("listTopDiskUsedApps").request(MediaType.TEXT_PLAIN).get()
-    assert(200 == response.getStatus)
-  }
-
-  test("openapi.json") {
-    val response = 
webTarget.path("openapi.json").request(MediaType.APPLICATION_JSON).get()
-    assert(200 == response.getStatus)
-    assert(response.readEntity(classOf[String]).contains("/conf"))
+    assert(HttpServletResponse.SC_OK == response.getStatus)
   }
 
   test("swagger") {
     Seq("swagger", "docs", "help").foreach { path =>
       val response = webTarget.path(path).request(MediaType.TEXT_HTML).get()
-      assert(200 == response.getStatus)
+      assert(HttpServletResponse.SC_OK == response.getStatus)
       assert(response.readEntity(classOf[String]).contains("swagger-ui"))
     }
+    Seq(
+      "openapi.json" -> MediaType.APPLICATION_JSON,
+      "openapi.yaml" -> "application/yaml").foreach { case (path, mediaType) =>
+      val response = webTarget.path(path).request(mediaType).get()
+      assert(HttpServletResponse.SC_OK == response.getStatus)
+      assert(response.readEntity(classOf[String]).contains("/conf"))
+    }
   }
 
-  test("metrics/prometheus") {
-    val response = 
webTarget.path("metrics/prometheus").request(MediaType.APPLICATION_JSON).get()
-    assert(200 == response.getStatus)
+  test("metrics") {
+    var response = 
webTarget.path("metrics/prometheus").request(MediaType.APPLICATION_JSON).get()
+    assert(HttpServletResponse.SC_OK == response.getStatus)
     
assert(response.readEntity(classOf[String]).contains("metrics_jvm_memory_heap_max_Value"))
-  }
 
-  test("metrics/json") {
-    val response = 
webTarget.path("metrics/json").request(MediaType.APPLICATION_JSON).get()
-    assert(200 == response.getStatus)
+    response = 
webTarget.path("metrics/json").request(MediaType.APPLICATION_JSON).get()
+    assert(HttpServletResponse.SC_OK == response.getStatus)
     assert(response.readEntity(classOf[String]).contains("\"name\" : 
\"jvm.memory.heap.max\""))
   }
 }
diff --git 
a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpTestHelper.scala
 
b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpTestHelper.scala
index 8674fc39c..6a6ee9242 100644
--- 
a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpTestHelper.scala
+++ 
b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpTestHelper.scala
@@ -64,6 +64,7 @@ trait HttpTestHelper extends AnyFunSuite
   override def beforeAll(): Unit = {
     super.beforeAll()
     restApiBaseSuite.setUp()
+    Thread.sleep(1000) // sleep for http server initialization
   }
 
   override def afterAll(): Unit = {
diff --git 
a/service/src/test/scala/org/apache/celeborn/server/common/http/authentication/UserDefinePasswordAuthenticationProviderImpl.scala
 
b/service/src/test/scala/org/apache/celeborn/server/common/http/authentication/UserDefinePasswordAuthenticationProviderImpl.scala
new file mode 100644
index 000000000..7009c1302
--- /dev/null
+++ 
b/service/src/test/scala/org/apache/celeborn/server/common/http/authentication/UserDefinePasswordAuthenticationProviderImpl.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http.authentication
+
+import java.security.Principal
+import javax.security.sasl.AuthenticationException
+
+import org.apache.celeborn.common.authentication.{BasicPrincipal, 
PasswdAuthenticationProvider}
+import org.apache.celeborn.common.internal.Logging
+import 
org.apache.celeborn.server.common.http.authentication.UserDefinePasswordAuthenticationProviderImpl.VALID_PASSWORD
+
+class UserDefinePasswordAuthenticationProviderImpl
+  extends PasswdAuthenticationProvider with Logging {
+  override def authenticate(user: String, password: String): Principal = {
+    if (password == VALID_PASSWORD) {
+      logInfo(s"Success log in of user: $user")
+      new BasicPrincipal(user)
+    } else {
+      throw new AuthenticationException("Username or password is not valid!")
+    }
+  }
+}
+
+object UserDefinePasswordAuthenticationProviderImpl {
+  val VALID_PASSWORD = "password"
+}
diff --git 
a/service/src/test/scala/org/apache/celeborn/server/common/http/authentication/UserDefineTokenAuthenticationProviderImpl.scala
 
b/service/src/test/scala/org/apache/celeborn/server/common/http/authentication/UserDefineTokenAuthenticationProviderImpl.scala
new file mode 100644
index 000000000..ab14d9a72
--- /dev/null
+++ 
b/service/src/test/scala/org/apache/celeborn/server/common/http/authentication/UserDefineTokenAuthenticationProviderImpl.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http.authentication
+
+import java.security.Principal
+import javax.security.sasl.AuthenticationException
+
+import org.apache.celeborn.common.authentication.{BasicPrincipal, 
TokenAuthenticationProvider}
+import org.apache.celeborn.common.internal.Logging
+import 
org.apache.celeborn.server.common.http.authentication.UserDefineTokenAuthenticationProviderImpl.VALID_TOKEN
+
+class UserDefineTokenAuthenticationProviderImpl extends 
TokenAuthenticationProvider with Logging {
+  override def authenticate(token: String): Principal = {
+    if (token == VALID_TOKEN) {
+      logInfo(s"Success log in of token: $token")
+      new BasicPrincipal("user")
+    } else {
+      throw new AuthenticationException("Token is not valid!")
+    }
+  }
+}
+
+object UserDefineTokenAuthenticationProviderImpl {
+  val VALID_TOKEN = "token"
+}
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResourceSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResourceSuite.scala
index ccf5697d4..408e39436 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResourceSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResourceSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.celeborn.service.deploy.worker.http.api
 
+import javax.servlet.http.HttpServletResponse
 import javax.ws.rs.core.MediaType
 
 import org.apache.celeborn.server.common.HttpService
@@ -44,22 +45,22 @@ class ApiWorkerResourceSuite extends ApiBaseResourceSuite 
with MiniClusterFeatur
 
   test("listPartitionLocationInfo") {
     val response = 
webTarget.path("listPartitionLocationInfo").request(MediaType.TEXT_PLAIN).get()
-    assert(200 == response.getStatus)
+    assert(HttpServletResponse.SC_OK == response.getStatus)
   }
 
   test("unavailablePeers") {
     val response = 
webTarget.path("unavailablePeers").request(MediaType.TEXT_PLAIN).get()
-    assert(200 == response.getStatus)
+    assert(HttpServletResponse.SC_OK == response.getStatus)
   }
 
   test("isShutdown") {
     val response = 
webTarget.path("isShutdown").request(MediaType.TEXT_PLAIN).get()
-    assert(200 == response.getStatus)
+    assert(HttpServletResponse.SC_OK == response.getStatus)
   }
 
   test("isRegistered") {
     val response = 
webTarget.path("isRegistered").request(MediaType.TEXT_PLAIN).get()
-    assert(200 == response.getStatus)
+    assert(HttpServletResponse.SC_OK == response.getStatus)
   }
 
   test("isDecommissioning") {
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceAuthenticationSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceAuthenticationSuite.scala
new file mode 100644
index 000000000..36ea0708f
--- /dev/null
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceAuthenticationSuite.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage
+
+import org.apache.celeborn.common.util.CelebornExitKind
+import org.apache.celeborn.server.common.HttpService
+import 
org.apache.celeborn.server.common.http.{ApiBaseResourceAuthenticationSuite, 
ApiBaseResourceSuite}
+import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments}
+
+class ApiWorkerResourceAuthenticationSuite extends 
ApiBaseResourceAuthenticationSuite {
+  private var worker: Worker = _
+  override protected def httpService: HttpService = worker
+
+  override def beforeAll(): Unit = {
+    val workerArgs = new WorkerArguments(Array(), celebornConf)
+    worker = new Worker(celebornConf, workerArgs)
+    worker.metricsSystem.start()
+    worker.startHttpServer()
+    super.beforeAll()
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    worker.metricsSystem.stop()
+    worker.rpcEnv.shutdown()
+    worker.stop(CelebornExitKind.EXIT_IMMEDIATELY)
+  }
+}

Reply via email to