This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 09d3a3b05 [CELEBORN-1493] Check admin privileges for http mutative 
requests
09d3a3b05 is described below

commit 09d3a3b05fcec32ee1e10da748d5ebba4dde195f
Author: Fei Wang <[email protected]>
AuthorDate: Fri Jul 12 11:51:35 2024 +0800

    [CELEBORN-1493] Check admin privileges for http mutative requests
    
    ### What changes were proposed in this pull request?
    
    If authentication enabled, check admin privileges for http mutative 
requests.
    
    Likes:
    
    ```
    POST /api/v1/workers/exclude
    POST /api/v1/workers/events
    POST /api/v1/workers/exit
    ```
    
    ### Why are the changes needed?
    
    For security requirement.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, after this pr, if http authentication enabled, for all mutative http 
requests, it will check the admin privileges.
    
    Before this PR, if an API is not defined and the method is 
`POST/PUT/DELETE/PATCH`, the response status code is `404`.
    After this PR, if the admin privileges check failed, the response status 
code will be `403`.
    
    ### How was this patch tested?
    UT.
    
    Closes #2601 from turboFei/admin_check.
    
    Authored-by: Fei Wang <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala  | 22 ++++++++++++++++
 docs/configuration/master.md                       |  1 +
 docs/configuration/worker.md                       |  1 +
 .../http/authentication/AuthenticationFilter.scala | 26 +++++++++++++++++--
 .../http/ApiBaseResourceAuthenticationSuite.scala  | 30 ++++++++++++++++++++++
 5 files changed, 78 insertions(+), 2 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index a2b2e284f..5d89945ae 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -2271,6 +2271,17 @@ object CelebornConf extends Logging {
       .stringConf
       .createWithDefault(classOf[AnonymousAuthenticationProviderImpl].getName)
 
+  val MASTER_HTTP_AUTH_ADMINISTERS: ConfigEntry[Seq[String]] =
+    buildConf("celeborn.master.http.auth.administers")
+      .categories("master")
+      .version("0.6.0")
+      .doc("A comma-separated list of users who have admin privileges," +
+        s" Note, when ${MASTER_HTTP_AUTH_SUPPORTED_SCHEMES.key} is not set," +
+        s" everyone is treated as administrator.")
+      .stringConf
+      .toSequence
+      .createWithDefault(Seq.empty)
+
   val HA_ENABLED: ConfigEntry[Boolean] =
     buildConf("celeborn.master.ha.enabled")
       .withAlternative("celeborn.ha.enabled")
@@ -2968,6 +2979,17 @@ object CelebornConf extends Logging {
       .stringConf
       .createWithDefault(classOf[AnonymousAuthenticationProviderImpl].getName)
 
+  val WORKER_HTTP_AUTH_ADMINISTERS: ConfigEntry[Seq[String]] =
+    buildConf("celeborn.worker.http.auth.administers")
+      .categories("worker")
+      .version("0.6.0")
+      .doc("A comma-separated list of users who have admin privileges," +
+        s" Note, when ${WORKER_HTTP_AUTH_SUPPORTED_SCHEMES.key} is not set," +
+        s" everyone is treated as administrator.")
+      .stringConf
+      .toSequence
+      .createWithDefault(Seq.empty)
+
   val WORKER_RPC_PORT: ConfigEntry[Int] =
     buildConf("celeborn.worker.rpc.port")
       .categories("worker")
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index efa455451..5c5bd8ac2 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -43,6 +43,7 @@ license: |
 | celeborn.master.heartbeat.application.timeout | 300s | false | Application 
heartbeat timeout. | 0.3.0 | celeborn.application.heartbeat.timeout | 
 | celeborn.master.heartbeat.worker.timeout | 120s | false | Worker heartbeat 
timeout. | 0.3.0 | celeborn.worker.heartbeat.timeout | 
 | celeborn.master.host | &lt;localhost&gt; | false | Hostname for master to 
bind. | 0.2.0 |  | 
+| celeborn.master.http.auth.administers |  | false | A comma-separated list of 
users who have admin privileges, Note, when 
celeborn.master.http.auth.supportedSchemes is not set, everyone is treated as 
administrator. | 0.6.0 |  | 
 | celeborn.master.http.auth.basic.provider | 
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl | 
false | User-defined password authentication implementation of 
org.apache.celeborn.common.authentication.PasswdAuthenticationProvider | 0.6.0 
|  | 
 | celeborn.master.http.auth.bearer.provider | 
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl | 
false | User-defined token authentication implementation of 
org.apache.celeborn.common.authentication.TokenAuthenticationProvider | 0.6.0 | 
 | 
 | celeborn.master.http.auth.supportedSchemes |  | false | A comma-separated 
list of master http auth supported schemes.<ul> <li>SPNEGO: Kerberos/GSSAPI 
authentication.</li> <li>BASIC: User-defined password authentication, the 
concreted implementation is configurable via 
`celeborn.master.http.auth.basic.provider`.</li> <li>BEARER: User-defined 
bearer token authentication, the concreted implementation is configurable via 
`celeborn.master.http.auth.bearer.provider`.</li></ul> | 0.6.0 |  | 
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 53938ccc0..14addd604 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -86,6 +86,7 @@ license: |
 | celeborn.worker.graceful.shutdown.saveCommittedFileInfo.interval | 5s | 
false | Interval for a Celeborn worker to flush committed file infos into Level 
DB. | 0.3.1 |  | 
 | celeborn.worker.graceful.shutdown.saveCommittedFileInfo.sync | false | false 
| Whether to call sync method to save committed file infos into Level DB to 
handle OS crash. | 0.3.1 |  | 
 | celeborn.worker.graceful.shutdown.timeout | 600s | false | The worker's 
graceful shutdown timeout time. | 0.2.0 |  | 
+| celeborn.worker.http.auth.administers |  | false | A comma-separated list of 
users who have admin privileges, Note, when 
celeborn.worker.http.auth.supportedSchemes is not set, everyone is treated as 
administrator. | 0.6.0 |  | 
 | celeborn.worker.http.auth.basic.provider | 
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl | 
false | User-defined password authentication implementation of 
org.apache.celeborn.common.authentication.PasswdAuthenticationProvider | 0.6.0 
|  | 
 | celeborn.worker.http.auth.bearer.provider | 
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl | 
false | User-defined token authentication implementation of 
org.apache.celeborn.common.authentication.TokenAuthenticationProvider | 0.6.0 | 
 | 
 | celeborn.worker.http.auth.supportedSchemes |  | false | A comma-separated 
list of worker http auth supported schemes.<ul> <li>SPNEGO: Kerberos/GSSAPI 
authentication.</li> <li>BASIC: User-defined password authentication, the 
concreted implementation is configurable via 
`celeborn.worker.http.auth.basic.provider`.</li> <li>BEARER: User-defined 
bearer token authentication, the concreted implementation is configurable via 
`celeborn.worker.http.auth.bearer.provider`.</li></ul> | 0.6.0 |  | 
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/AuthenticationFilter.scala
 
b/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/AuthenticationFilter.scala
index 650ad268e..b90412ff4 100644
--- 
a/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/AuthenticationFilter.scala
+++ 
b/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/AuthenticationFilter.scala
@@ -21,6 +21,7 @@ import java.io.IOException
 import javax.security.sasl.AuthenticationException
 import javax.servlet.{Filter, FilterChain, FilterConfig, ServletException, 
ServletRequest, ServletResponse}
 import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+import javax.ws.rs.HttpMethod
 
 import scala.collection.mutable
 
@@ -64,6 +65,13 @@ class AuthenticationFilter(conf: CelebornConf, serviceName: 
String) extends Filt
     case Service.WORKER => 
conf.get(CelebornConf.WORKER_HTTP_PROXY_CLIENT_IP_HEADER)
   }
 
+  private val administrators: Set[String] = serviceName match {
+    case Service.MASTER =>
+      conf.get(CelebornConf.MASTER_HTTP_AUTH_ADMINISTERS).toSet
+    case Service.WORKER =>
+      conf.get(CelebornConf.WORKER_HTTP_AUTH_ADMINISTERS).toSet
+  }
+
   private def initAuthHandlers(): Unit = {
     if (authSchemes.contains(HttpAuthSchemes.NEGOTIATE)) {
       serviceName match {
@@ -160,8 +168,15 @@ class AuthenticationFilter(conf: CelebornConf, 
serviceName: String) extends Filt
     }
   }
 
+  private def isMutativeRequest(httpRequest: HttpServletRequest): Boolean = {
+    HttpMethod.POST.equalsIgnoreCase(httpRequest.getMethod) ||
+    HttpMethod.PUT.equalsIgnoreCase(httpRequest.getMethod) ||
+    HttpMethod.DELETE.equalsIgnoreCase(httpRequest.getMethod) ||
+    HttpMethod.PATCH.equalsIgnoreCase(httpRequest.getMethod)
+  }
+
   /**
-   * Delegates call to the servlet filter chain. Sub-classes my override this
+   * Delegates call to the servlet filter chain. Sub-classes may override this
    * method to perform pre and post tasks.
    *
    * @param filterChain the filter chain object.
@@ -176,7 +191,14 @@ class AuthenticationFilter(conf: CelebornConf, 
serviceName: String) extends Filt
       filterChain: FilterChain,
       request: HttpServletRequest,
       response: HttpServletResponse): Unit = {
-    filterChain.doFilter(request, response)
+    if (isMutativeRequest(request) && 
!administrators.contains(HTTP_CLIENT_IDENTIFIER.get())) {
+      response.setStatus(HttpServletResponse.SC_FORBIDDEN)
+      response.sendError(
+        HttpServletResponse.SC_FORBIDDEN,
+        s"${HTTP_CLIENT_IDENTIFIER.get()} does not have admin privilege to 
perform ${request.getMethod} action")
+    } else {
+      filterChain.doFilter(request, response)
+    }
   }
 
   override def destroy(): Unit = {
diff --git 
a/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceAuthenticationSuite.scala
 
b/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceAuthenticationSuite.scala
index 938454707..41aa3269c 100644
--- 
a/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceAuthenticationSuite.scala
+++ 
b/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceAuthenticationSuite.scala
@@ -29,6 +29,7 @@ import 
org.apache.celeborn.server.common.http.HttpAuthUtils.AUTHORIZATION_HEADER
 import 
org.apache.celeborn.server.common.http.authentication.{UserDefinePasswordAuthenticationProviderImpl,
 UserDefineTokenAuthenticationProviderImpl}
 
 abstract class ApiBaseResourceAuthenticationSuite extends HttpTestHelper {
+  val administers = Seq("celeborn", "celeborn2")
   celebornConf
     .set(CelebornConf.METRICS_ENABLED.key, "true")
     .set(
@@ -48,6 +49,8 @@ abstract class ApiBaseResourceAuthenticationSuite extends 
HttpTestHelper {
     .set(
       CelebornConf.WORKER_HTTP_AUTH_BEARER_PROVIDER,
       classOf[UserDefineTokenAuthenticationProviderImpl].getName)
+    .set(CelebornConf.MASTER_HTTP_AUTH_ADMINISTERS, administers)
+    .set(CelebornConf.WORKER_HTTP_AUTH_ADMINISTERS, administers)
 
   def basicAuthorizationHeader(user: String, password: String): String =
     HttpAuthSchemes.BASIC + " " + new String(
@@ -117,4 +120,31 @@ abstract class ApiBaseResourceAuthenticationSuite extends 
HttpTestHelper {
     assert(200 == response.getStatus)
     assert(response.readEntity(classOf[String]).contains("\"name\" : 
\"jvm.memory.heap.max\""))
   }
+
+  test("check admin privilege for mutative request") {
+    Seq("/any_api", "/api/v1/any_api").foreach { api =>
+      var response = webTarget.path(api)
+        .request(MediaType.TEXT_PLAIN)
+        .header(
+          AUTHORIZATION_HEADER,
+          basicAuthorizationHeader(
+            "no_admin",
+            UserDefinePasswordAuthenticationProviderImpl.VALID_PASSWORD))
+        .post(null)
+      assert(HttpServletResponse.SC_FORBIDDEN == response.getStatus)
+
+      administers.foreach { admin =>
+        response = webTarget.path(api)
+          .request(MediaType.TEXT_PLAIN)
+          .header(
+            AUTHORIZATION_HEADER,
+            basicAuthorizationHeader(
+              admin,
+              UserDefinePasswordAuthenticationProviderImpl.VALID_PASSWORD))
+          .post(null)
+        // pass the admin privilege check, but the api is not found
+        assert(HttpServletResponse.SC_NOT_FOUND == response.getStatus)
+      }
+    }
+  }
 }

Reply via email to