This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new adbc77cd4 [CELEBORN-1317] Refine celeborn http server and support 
swagger ui
adbc77cd4 is described below

commit adbc77cd4ff5a648d25fe01c3dabc87ee5a4869f
Author: Fei Wang <[email protected]>
AuthorDate: Wed Mar 27 23:18:18 2024 +0800

    [CELEBORN-1317] Refine celeborn http server and support swagger ui
    
    ### What changes were proposed in this pull request?
    
    Before, there is no http request spec likes query param, http method and 
response mediaType.
    And for each api, a HttpEndpoint class is needed.
    
    In this PR, we refine the code for http service and provide swagger ui.
    
    Note that: This pr does not change the orignal api request and response 
behavior, including metrics APIs.
    
    TODO:
    1. define DTO
    2. http request authentication
    
    <img width="1900" alt="image" 
src="https://github.com/apache/incubator-celeborn/assets/6757692/7f8c2363-170d-4bdf-b2c9-74260e31d3e5";>
    
    <img width="1138" alt="image" 
src="https://github.com/apache/incubator-celeborn/assets/6757692/3ae6ec8e-00a8-475b-bb37-0329536185f6";>
    
    ### Why are the changes needed?
    
    To close CELEBORN-1317
    
    ### Does this PR introduce _any_ user-facing change?
    
    The api is align with before.
    
    ### How was this patch tested?
    UT.
    
    Closes #2371 from turboFei/jetty.
    
    Authored-by: Fei Wang <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 LICENSE-binary                                     |  25 ++
 .../org/apache/celeborn/common/CelebornConf.scala  |  40 +++
 .../celeborn/common/metrics/MetricsUtils.scala     |  22 +-
 .../celeborn/common/metrics/sink/CsvSink.scala     |   4 +-
 .../common/metrics/sink/GraphiteSink.scala         |   4 +-
 dev/deps/dependencies-server                       |  44 +++-
 docs/configuration/master.md                       |   2 +
 docs/configuration/worker.md                       |   2 +
 master/pom.xml                                     |  16 ++
 .../deploy/master/http/api/ApiMasterResource.scala | 112 +++++++++
 .../deploy/master/ApiMasterResourceSuite.scala     | 108 +++++++++
 pom.xml                                            | 112 ++++++++-
 project/CelebornBuild.scala                        |  56 ++++-
 service/pom.xml                                    | 100 +++++++-
 .../org/apache/celeborn/swagger/index.html         |  73 ++++++
 .../celeborn/common/metrics/MetricsSystem.scala    |   9 +-
 .../common/metrics/sink/AbstractServlet.scala      |  18 +-
 .../celeborn/common/metrics/sink/JsonServlet.scala |  21 +-
 .../common/metrics/sink/PrometheusServlet.scala    |  20 +-
 .../celeborn/server/common/HttpService.scala       |  56 ++++-
 .../celeborn/server/common/http/HttpEndpoint.scala | 269 ---------------------
 .../server/common/http/HttpRequestHandler.scala    |  77 ------
 .../celeborn/server/common/http/HttpServer.scala   | 137 +++++++----
 .../celeborn/server/common/http/HttpUtils.scala    | 165 ++++++++-----
 .../server/common/http/api/ApiBaseResource.scala   | 127 ++++++++++
 .../server/common/http/api/ApiRequestContext.scala |  50 ++++
 .../ApiRootResource.scala}                         |  25 +-
 .../common/http/api/CelebornOpenApiResource.scala  |  98 ++++++++
 .../CelebornScalaObjectMapper.scala}               |  20 +-
 .../OpenAPIConfig.scala}                           |  24 +-
 service/src/test/resources/metrics-api.properties  |  18 ++
 .../server/common/http/ApiBaseResourceSuite.scala  |  99 ++++++++
 .../server/common/http/HttpTestHelper.scala        |  78 ++++++
 .../server/common/http/HttpUtilsSuite.scala        | 121 ---------
 tests/spark-it/pom.xml                             |   8 +
 worker/pom.xml                                     |  16 ++
 .../deploy/worker/http/api/ApiWorkerResource.scala |  78 ++++++
 .../worker/storage/ApiWorkerResourceSuite.scala    |  66 +++++
 38 files changed, 1615 insertions(+), 705 deletions(-)

diff --git a/LICENSE-binary b/LICENSE-binary
index a33986495..034141317 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -248,6 +248,11 @@ io.netty:netty-transport-native-unix-common
 io.netty:netty-transport-rxtx
 io.netty:netty-transport-sctp
 io.netty:netty-transport-udt
+io.swagger.core.v3:swagger-annotations
+io.swagger.core.v3:swagger-core
+io.swagger.core.v3:swagger-integration
+io.swagger.core.v3:swagger-jaxrs2
+io.swagger.core.v3:swagger-models
 org.apache.commons:commons-crypto
 org.apache.commons:commons-lang3
 org.apache.hadoop:hadoop-client-api
@@ -267,6 +272,13 @@ org.apache.ratis:ratis-server
 org.apache.ratis:ratis-server-api
 org.apache.ratis:ratis-shell
 org.apache.ratis:ratis-thirdparty-misc
+org.eclipse.jetty:jetty-http
+org.eclipse.jetty:jetty-io
+org.eclipse.jetty:jetty-security
+org.eclipse.jetty:jetty-server
+org.eclipse.jetty:jetty-servlet
+org.eclipse.jetty:jetty-util-ajax
+org.eclipse.jetty:jetty-util
 org.javassist:javassist
 org.reflections:reflections
 org.roaringbitmap:RoaringBitmap
@@ -275,6 +287,7 @@ org.rocksdb:rocksdbjni
 org.scala-lang:scala-library
 org.scala-lang:scala-reflect
 org.slf4j:jcl-over-slf4j
+org.webjars:swagger-ui
 org.xerial.snappy:snappy-java
 org.yaml:snakeyaml
 
@@ -308,3 +321,15 @@ org.slf4j:slf4j-api
 ------------
 See licenses/LICENSE-javassist.txt for detail.
 org.javassist:javassist
+
+Eclipse Public License (EPL) 2.0
+--------------------------------
+jakarta.annotation:jakarta.annotation-api
+jakarta.servlet:jakarta.servlet-api
+jakarta.ws.rs:jakarta.ws.rs-api
+org.glassfish.hk2:hk2-api
+org.glassfish.hk2:hk2-locator
+org.glassfish.hk2:hk2-utils
+org.glassfish.hk2.external:aopalliance-repackaged
+org.glassfish.hk2.external:jakarta.inject
+org.glassfish.hk2:osgi-resource-locator
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 905cfad57..2e4bd2d5a 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -586,6 +586,10 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
 
   def masterHttpPort: Int = get(MASTER_HTTP_PORT)
 
+  def masterHttpMaxWorkerThreads: Int = get(MASTER_HTTP_MAX_WORKER_THREADS)
+
+  def masterHttpStopTimeout: Long = get(MASTER_HTTP_STOP_TIMEOUT)
+
   def haEnabled: Boolean = get(HA_ENABLED)
 
   def haMasterNodeId: Option[String] = get(HA_MASTER_NODE_ID)
@@ -676,6 +680,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def workerHttpHost: String =
     get(WORKER_HTTP_HOST).replace("<localhost>", Utils.localHostName(this))
   def workerHttpPort: Int = get(WORKER_HTTP_PORT)
+  def workerHttpMaxWorkerThreads: Int = get(WORKER_HTTP_MAX_WORKER_THREADS)
+  def workerHttpStopTimeout: Long = get(WORKER_HTTP_STOP_TIMEOUT)
   def workerRpcPort: Int = get(WORKER_RPC_PORT)
   def workerPushPort: Int = get(WORKER_PUSH_PORT)
   def workerFetchPort: Int = get(WORKER_FETCH_PORT)
@@ -1976,6 +1982,23 @@ object CelebornConf extends Logging {
       .checkValue(p => p >= 1024 && p < 65535, "Invalid port")
       .createWithDefault(9098)
 
+  val MASTER_HTTP_MAX_WORKER_THREADS: ConfigEntry[Int] =
+    buildConf("celeborn.master.http.maxWorkerThreads")
+      .categories("master")
+      .version("0.5.0")
+      .doc("Maximum number of threads in the master http worker thread pool.")
+      .intConf
+      .checkValue(_ > 0, "Must be positive.")
+      .createWithDefault(999)
+
+  val MASTER_HTTP_STOP_TIMEOUT: ConfigEntry[Long] =
+    buildConf("celeborn.master.http.stopTimeout")
+      .categories("master")
+      .version("0.5.0")
+      .doc("Master http server stop timeout.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("5s")
+
   val HA_ENABLED: ConfigEntry[Boolean] =
     buildConf("celeborn.master.ha.enabled")
       .withAlternative("celeborn.ha.enabled")
@@ -2534,6 +2557,23 @@ object CelebornConf extends Logging {
       .checkValue(p => p >= 1024 && p < 65535, "Invalid port")
       .createWithDefault(9096)
 
+  val WORKER_HTTP_MAX_WORKER_THREADS: ConfigEntry[Int] =
+    buildConf("celeborn.worker.http.maxWorkerThreads")
+      .categories("worker")
+      .version("0.5.0")
+      .doc("Maximum number of threads in the worker http worker thread pool.")
+      .intConf
+      .checkValue(_ > 0, "Must be positive.")
+      .createWithDefault(999)
+
+  val WORKER_HTTP_STOP_TIMEOUT: ConfigEntry[Long] =
+    buildConf("celeborn.worker.http.stopTimeout")
+      .categories("worker")
+      .version("0.5.0")
+      .doc("Worker http server stop timeout.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("5s")
+
   val WORKER_RPC_PORT: ConfigEntry[Int] =
     buildConf("celeborn.worker.rpc.port")
       .categories("worker")
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServerInitializer.scala
 b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsUtils.scala
similarity index 57%
copy from 
service/src/main/scala/org/apache/celeborn/server/common/http/HttpServerInitializer.scala
copy to 
common/src/main/scala/org/apache/celeborn/common/metrics/MetricsUtils.scala
index 2619606b5..0d5086bbb 100644
--- 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServerInitializer.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsUtils.scala
@@ -15,19 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.celeborn.server.common.http
+package org.apache.celeborn.common.metrics
 
-import io.netty.channel.{ChannelInitializer, SimpleChannelInboundHandler}
-import io.netty.channel.socket.SocketChannel
-import io.netty.handler.codec.http.{HttpObjectAggregator, HttpServerCodec}
+import java.util.concurrent.TimeUnit
 
-class HttpServerInitializer(
-    handlers: SimpleChannelInboundHandler[_]) extends 
ChannelInitializer[SocketChannel] {
+object MetricsUtils {
+  private[this] val MINIMAL_POLL_UNIT = TimeUnit.SECONDS
+  private[this] val MINIMAL_POLL_PERIOD = 1
 
-  override def initChannel(channel: SocketChannel): Unit = {
-    val pipeline = channel.pipeline()
-    pipeline.addLast(new HttpServerCodec())
-      .addLast("httpAggregator", new HttpObjectAggregator(512 * 1024))
-      .addLast(handlers)
+  def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int) {
+    val period = MINIMAL_POLL_UNIT.convert(pollPeriod, pollUnit)
+    if (period < MINIMAL_POLL_PERIOD) {
+      throw new IllegalArgumentException("Polling period " + pollPeriod + " " 
+ pollUnit +
+        " below than minimal polling period ")
+    }
   }
 }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/CsvSink.scala 
b/common/src/main/scala/org/apache/celeborn/common/metrics/sink/CsvSink.scala
index 34097b34d..52ccf4638 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/CsvSink.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/metrics/sink/CsvSink.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit
 
 import com.codahale.metrics.{CsvReporter, MetricRegistry}
 
-import org.apache.celeborn.common.metrics.MetricsSystem
+import org.apache.celeborn.common.metrics.MetricsUtils
 
 class CsvSink(val property: Properties, val registry: MetricRegistry) extends 
Sink {
   val CSV_KEY_PERIOD = "period"
@@ -44,7 +44,7 @@ class CsvSink(val property: Properties, val registry: 
MetricRegistry) extends Si
     case None => TimeUnit.valueOf(CSV_DEFAULT_UNIT)
   }
 
-  MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
+  MetricsUtils.checkMinimalPollingPeriod(pollUnit, pollPeriod)
 
   val pollDir = Option(property.getProperty(CSV_KEY_DIR)) match {
     case Some(s) => s
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/GraphiteSink.scala
 
b/common/src/main/scala/org/apache/celeborn/common/metrics/sink/GraphiteSink.scala
index cc97b9bef..b2be72fff 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/GraphiteSink.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/metrics/sink/GraphiteSink.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit
 import com.codahale.metrics.MetricRegistry
 import com.codahale.metrics.graphite.{Graphite, GraphiteReporter, GraphiteUDP}
 
-import org.apache.celeborn.common.metrics.MetricsSystem
+import org.apache.celeborn.common.metrics.MetricsUtils
 
 private class GraphiteSink(val property: Properties, val registry: 
MetricRegistry) extends Sink {
   val GRAPHITE_DEFAULT_PERIOD = 10
@@ -62,7 +62,7 @@ private class GraphiteSink(val property: Properties, val 
registry: MetricRegistr
 
   val prefix = 
propertyToOption(GRAPHITE_KEY_PREFIX).getOrElse(GRAPHITE_DEFAULT_PREFIX)
 
-  MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
+  MetricsUtils.checkMinimalPollingPeriod(pollUnit, pollPeriod)
 
   val graphite = 
propertyToOption(GRAPHITE_KEY_PROTOCOL).map(_.toLowerCase(Locale.ROOT)) match {
     case Some("udp") => new GraphiteUDP(host, port)
diff --git a/dev/deps/dependencies-server b/dev/deps/dependencies-server
index eadf37b17..ca5a32e7e 100644
--- a/dev/deps/dependencies-server
+++ b/dev/deps/dependencies-server
@@ -17,7 +17,9 @@
 
 HikariCP/4.0.3//HikariCP-4.0.3.jar
 RoaringBitmap/0.9.32//RoaringBitmap-0.9.32.jar
+aopalliance-repackaged/2.6.1//aopalliance-repackaged-2.6.1.jar
 ap-loader-all/3.0-8//ap-loader-all-3.0-8.jar
+classgraph/4.8.138//classgraph-4.8.138.jar
 commons-cli/1.5.0//commons-cli-1.5.0.jar
 commons-crypto/1.0.0//commons-crypto-1.0.0.jar
 commons-io/2.13.0//commons-io-2.13.0.jar
@@ -27,13 +29,43 @@ failureaccess/1.0.1//failureaccess-1.0.1.jar
 guava/32.1.3-jre//guava-32.1.3-jre.jar
 hadoop-client-api/3.3.6//hadoop-client-api-3.3.6.jar
 hadoop-client-runtime/3.3.6//hadoop-client-runtime-3.3.6.jar
+hk2-api/2.6.1//hk2-api-2.6.1.jar
+hk2-locator/2.6.1//hk2-locator-2.6.1.jar
+hk2-utils/2.6.1//hk2-utils-2.6.1.jar
 jackson-annotations/2.15.3//jackson-annotations-2.15.3.jar
 jackson-core/2.15.3//jackson-core-2.15.3.jar
 jackson-databind/2.15.3//jackson-databind-2.15.3.jar
+jackson-dataformat-yaml/2.13.2//jackson-dataformat-yaml-2.13.2.jar
+jackson-datatype-jsr310/2.13.2//jackson-datatype-jsr310-2.13.2.jar
+jackson-jaxrs-base/2.13.2//jackson-jaxrs-base-2.13.2.jar
+jackson-jaxrs-json-provider/2.13.2//jackson-jaxrs-json-provider-2.13.2.jar
+jackson-module-jaxb-annotations/2.14.1//jackson-module-jaxb-annotations-2.14.1.jar
 jackson-module-scala_2.12/2.15.3//jackson-module-scala_2.12-2.15.3.jar
-javassist/3.28.0-GA//javassist-3.28.0-GA.jar
-javax.servlet-api/3.1.0//javax.servlet-api-3.1.0.jar
+jakarta.annotation-api/1.3.5//jakarta.annotation-api-1.3.5.jar
+jakarta.inject/2.6.1//jakarta.inject-2.6.1.jar
+jakarta.servlet-api/4.0.4//jakarta.servlet-api-4.0.4.jar
+jakarta.validation-api/2.0.2//jakarta.validation-api-2.0.2.jar
+jakarta.ws.rs-api/2.1.6//jakarta.ws.rs-api-2.1.6.jar
+jakarta.xml.bind-api/2.3.3//jakarta.xml.bind-api-2.3.3.jar
+javassist/3.29.0-GA//javassist-3.29.0-GA.jar
 jcl-over-slf4j/1.7.36//jcl-over-slf4j-1.7.36.jar
+jersey-client/2.39.1//jersey-client-2.39.1.jar
+jersey-common/2.39.1//jersey-common-2.39.1.jar
+jersey-container-servlet-core/2.39.1//jersey-container-servlet-core-2.39.1.jar
+jersey-entity-filtering/2.39.1//jersey-entity-filtering-2.39.1.jar
+jersey-hk2/2.39.1//jersey-hk2-2.39.1.jar
+jersey-media-json-jackson/2.39.1//jersey-media-json-jackson-2.39.1.jar
+jersey-media-multipart/2.39.1//jersey-media-multipart-2.39.1.jar
+jersey-server/2.39.1//jersey-server-2.39.1.jar
+jetty-client/9.4.52.v20230823//jetty-client-9.4.52.v20230823.jar
+jetty-http/9.4.52.v20230823//jetty-http-9.4.52.v20230823.jar
+jetty-io/9.4.52.v20230823//jetty-io-9.4.52.v20230823.jar
+jetty-proxy/9.4.52.v20230823//jetty-proxy-9.4.52.v20230823.jar
+jetty-security/9.4.52.v20230823//jetty-security-9.4.52.v20230823.jar
+jetty-server/9.4.52.v20230823//jetty-server-9.4.52.v20230823.jar
+jetty-servlet/9.4.52.v20230823//jetty-servlet-9.4.52.v20230823.jar
+jetty-util-ajax/9.4.52.v20230823//jetty-util-ajax-9.4.52.v20230823.jar
+jetty-util/9.4.52.v20230823//jetty-util-9.4.52.v20230823.jar
 jsr305/1.3.9//jsr305-1.3.9.jar
 jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
 leveldbjni-all/1.8//leveldbjni-all-1.8.jar
@@ -46,6 +78,7 @@ maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
 metrics-core/3.2.6//metrics-core-3.2.6.jar
 metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
 metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar
+mimepull/1.9.15//mimepull-1.9.15.jar
 mybatis/3.5.15//mybatis-3.5.15.jar
 netty-all/4.1.107.Final//netty-all-4.1.107.Final.jar
 netty-buffer/4.1.107.Final//netty-buffer-4.1.107.Final.jar
@@ -81,6 +114,7 @@ 
netty-transport-rxtx/4.1.107.Final//netty-transport-rxtx-4.1.107.Final.jar
 netty-transport-sctp/4.1.107.Final//netty-transport-sctp-4.1.107.Final.jar
 netty-transport-udt/4.1.107.Final//netty-transport-udt-4.1.107.Final.jar
 netty-transport/4.1.107.Final//netty-transport-4.1.107.Final.jar
+osgi-resource-locator/1.0.3//osgi-resource-locator-1.0.3.jar
 paranamer/2.8//paranamer-2.8.jar
 protobuf-java/3.21.7//protobuf-java-3.21.7.jar
 ratis-client/2.5.1//ratis-client-2.5.1.jar
@@ -101,4 +135,10 @@ shims/0.9.32//shims-0.9.32.jar
 slf4j-api/1.7.36//slf4j-api-1.7.36.jar
 snakeyaml/2.2//snakeyaml-2.2.jar
 snappy-java/1.1.10.5//snappy-java-1.1.10.5.jar
+swagger-annotations/2.2.1//swagger-annotations-2.2.1.jar
+swagger-core/2.2.1//swagger-core-2.2.1.jar
+swagger-integration/2.2.1//swagger-integration-2.2.1.jar
+swagger-jaxrs2/2.2.1//swagger-jaxrs2-2.2.1.jar
+swagger-models/2.2.1//swagger-models-2.2.1.jar
+swagger-ui/4.9.1//swagger-ui-4.9.1.jar
 zstd-jni/1.5.2-1//zstd-jni-1.5.2-1.jar
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index 7a504441d..13f1d5c40 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -43,7 +43,9 @@ license: |
 | celeborn.master.heartbeat.worker.timeout | 120s | false | Worker heartbeat 
timeout. | 0.3.0 | celeborn.worker.heartbeat.timeout | 
 | celeborn.master.host | &lt;localhost&gt; | false | Hostname for master to 
bind. | 0.2.0 |  | 
 | celeborn.master.http.host | &lt;localhost&gt; | false | Master's http host. 
| 0.4.0 | 
celeborn.metrics.master.prometheus.host,celeborn.master.metrics.prometheus.host 
| 
+| celeborn.master.http.maxWorkerThreads | 999 | false | Maximum number of 
threads in the master http worker thread pool. | 0.5.0 |  | 
 | celeborn.master.http.port | 9098 | false | Master's http port. | 0.4.0 | 
celeborn.metrics.master.prometheus.port,celeborn.master.metrics.prometheus.port 
| 
+| celeborn.master.http.stopTimeout | 5s | false | Master http server stop 
timeout. | 0.5.0 |  | 
 | celeborn.master.internal.port | 8097 | false | Internal port on the master 
where both workers and other master nodes connect. | 0.5.0 |  | 
 | celeborn.master.port | 9097 | false | Port for master to bind. | 0.2.0 |  | 
 | celeborn.master.rackResolver.refresh.interval | 30s | false | Interval for 
refreshing the node rack information periodically. | 0.5.0 |  | 
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 68ad3f494..484cf1b6d 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -83,7 +83,9 @@ license: |
 | celeborn.worker.graceful.shutdown.saveCommittedFileInfo.sync | false | false 
| Whether to call sync method to save committed file infos into Level DB to 
handle OS crash. | 0.3.1 |  | 
 | celeborn.worker.graceful.shutdown.timeout | 600s | false | The worker's 
graceful shutdown timeout time. | 0.2.0 |  | 
 | celeborn.worker.http.host | &lt;localhost&gt; | false | Worker's http host. 
| 0.4.0 | 
celeborn.metrics.worker.prometheus.host,celeborn.worker.metrics.prometheus.host 
| 
+| celeborn.worker.http.maxWorkerThreads | 999 | false | Maximum number of 
threads in the worker http worker thread pool. | 0.5.0 |  | 
 | celeborn.worker.http.port | 9096 | false | Worker's http port. | 0.4.0 | 
celeborn.metrics.worker.prometheus.port,celeborn.worker.metrics.prometheus.port 
| 
+| celeborn.worker.http.stopTimeout | 5s | false | Worker http server stop 
timeout. | 0.5.0 |  | 
 | celeborn.worker.internal.port | 0 | false | Internal server port on the 
Worker where the master nodes connect. | 0.5.0 |  | 
 | celeborn.worker.jvmProfiler.enabled | false | false | Turn on code profiling 
via async_profiler in workers. | 0.5.0 |  | 
 | celeborn.worker.jvmProfiler.localDir | . | false | Local file system path on 
worker where profiler output is saved. Defaults to the working directory of the 
worker process. | 0.5.0 |  | 
diff --git a/master/pom.xml b/master/pom.xml
index a5260a1ce..4dd3ed11c 100644
--- a/master/pom.xml
+++ b/master/pom.xml
@@ -83,11 +83,27 @@
       <artifactId>log4j-1.2-api</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      <artifactId>celeborn-service_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+    </dependency>
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.glassfish.jersey.test-framework</groupId>
+      <artifactId>jersey-test-framework-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.glassfish.jersey.test-framework.providers</groupId>
+      <artifactId>jersey-test-framework-provider-jetty</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResource.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResource.scala
new file mode 100644
index 000000000..2093dee03
--- /dev/null
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResource.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.master.http.api
+
+import javax.ws.rs.{GET, Path, POST, QueryParam}
+import javax.ws.rs.core.MediaType
+
+import io.swagger.v3.oas.annotations.media.Content
+import io.swagger.v3.oas.annotations.responses.ApiResponse
+
+import org.apache.celeborn.server.common.http.api.ApiRequestContext
+
+@Path("/")
+class ApiMasterResource extends ApiRequestContext {
+
+  @Path("/masterGroupInfo")
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.TEXT_PLAIN)),
+    description =
+      "List master group information of the service. It will list all master's 
LEADER, FOLLOWER information.")
+  @GET
+  def masterGroupInfo: String = httpService.getMasterGroupInfo
+
+  @Path("/lostWorkers")
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.TEXT_PLAIN)),
+    description = "List all lost workers of the master.")
+  @GET
+  def lostWorkers: String = httpService.getLostWorkers
+
+  @Path("/excludedWorkers")
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.TEXT_PLAIN)),
+    description = "List all excluded workers of the master.")
+  @GET
+  def excludedWorkers: String = httpService.getExcludedWorkers
+
+  @Path("/shutdownWorkers")
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.TEXT_PLAIN)),
+    description = "List all shutdown workers of the master.")
+  @GET
+  def shutdownWorkers: String = httpService.getShutdownWorkers
+
+  @Path("/hostnames")
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.TEXT_PLAIN)),
+    description = "List all running application's LifecycleManager's hostnames 
of the cluster.")
+  @GET
+  def hostnames: String = httpService.getHostnameList
+
+  @Path("/sendWorkerEvent")
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.TEXT_PLAIN)),
+    description =
+      "For Master(Leader) can send worker event to manager workers. Legal 
types are 'None', 'Immediately', 'Decommission', 'DecommissionThenIdle', 
'Graceful', 'Recommission'")
+  @POST
+  def sendWorkerEvent(
+      @QueryParam("TYPE") eventType: String,
+      @QueryParam("WORKERS") workers: String): String = {
+    httpService.handleWorkerEvent(normalizeParam(eventType), 
normalizeParam(workers))
+  }
+
+  @Path("/workerEventInfo")
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.TEXT_PLAIN)),
+    description = "List all worker event infos of the master.")
+  @GET
+  def workerEventInfo: String = httpService.getWorkerEventInfo()
+
+  @Path("/exclude")
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.TEXT_PLAIN)),
+    description = "List all worker event infos of the master.")
+  @POST
+  def excludeWorkers(
+      @QueryParam("ADD") addWorkers: String,
+      @QueryParam("REMOVE") removeWorkers: String): String = {
+    httpService.exclude(normalizeParam(addWorkers), 
normalizeParam(removeWorkers))
+  }
+}
diff --git 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceSuite.scala
 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceSuite.scala
new file mode 100644
index 000000000..f5e06e6a5
--- /dev/null
+++ 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceSuite.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.master
+
+import javax.ws.rs.core.MediaType
+
+import com.google.common.io.Files
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
+import org.apache.celeborn.server.common.HttpService
+import org.apache.celeborn.server.common.http.ApiBaseResourceSuite
+
+class ApiMasterResourceSuite extends ApiBaseResourceSuite {
+  private var master: Master = _
+
+  override protected def httpService: HttpService = master
+
+  def getTmpDir(): String = {
+    val tmpDir = Files.createTempDir()
+    tmpDir.deleteOnExit()
+    tmpDir.getAbsolutePath
+  }
+
+  override def beforeAll(): Unit = {
+    val randomMasterPort = Utils.selectRandomPort(1024, 65535)
+    val randomHttpPort = randomMasterPort + 1
+    celebornConf.set(CelebornConf.HA_ENABLED.key, "false")
+    celebornConf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir())
+    celebornConf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir())
+    celebornConf.set(CelebornConf.MASTER_HTTP_HOST.key, "127.0.0.1")
+    celebornConf.set(CelebornConf.MASTER_HTTP_PORT.key, 
randomHttpPort.toString)
+
+    val args = Array("-h", "localhost", "-p", randomMasterPort.toString)
+
+    val masterArgs = new MasterArguments(args, celebornConf)
+    master = new Master(celebornConf, masterArgs)
+    new Thread() {
+      override def run(): Unit = {
+        master.initialize()
+      }
+    }.start()
+    super.beforeAll()
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
+    master.rpcEnv.shutdown()
+  }
+
+  test("masterGroupInfo") {
+    val response = 
webTarget.path("masterGroupInfo").request(MediaType.TEXT_PLAIN).get()
+    assert(200 == response.getStatus)
+  }
+
+  test("lostWorkers") {
+    val response = 
webTarget.path("lostWorkers").request(MediaType.TEXT_PLAIN).get()
+    assert(200 == response.getStatus)
+  }
+
+  test("excludedWorkers") {
+    val response = 
webTarget.path("excludedWorkers").request(MediaType.TEXT_PLAIN).get()
+    assert(200 == response.getStatus)
+  }
+
+  test("shutdownWorkers") {
+    val response = 
webTarget.path("shutdownWorkers").request(MediaType.TEXT_PLAIN).get()
+    assert(200 == response.getStatus)
+  }
+
+  test("hostnames") {
+    val response = 
webTarget.path("hostnames").request(MediaType.TEXT_PLAIN).get()
+    assert(200 == response.getStatus)
+  }
+
+  test("sendWorkerEvent") {
+    val response = webTarget.path("sendWorkerEvent")
+      .request(MediaType.TEXT_PLAIN)
+      .post(null)
+    assert(200 == response.getStatus)
+  }
+
+  test("workerEventInfo") {
+    val response = 
webTarget.path("workerEventInfo").request(MediaType.TEXT_PLAIN).get()
+    assert(200 == response.getStatus)
+  }
+
+  test("exclude") {
+    val response = 
webTarget.path("exclude").request(MediaType.TEXT_PLAIN).post(null)
+    assert(200 == response.getStatus)
+  }
+}
diff --git a/pom.xml b/pom.xml
index fdd60e69f..00a718a4c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -80,7 +80,6 @@
     <google.jsr305.version>1.3.9</google.jsr305.version>
     <grpc.version>1.44.0</grpc.version>
     <guava.version>32.1.3-jre</guava.version>
-    <javaxservlet.version>3.1.0</javaxservlet.version>
     <junit.version>4.13.2</junit.version>
     <leveldb.version>1.8</leveldb.version>
     <log4j2.version>2.17.2</log4j2.version>
@@ -107,6 +106,13 @@
     <hikaricp.version>4.0.3</hikaricp.version>
     <h2.version>2.2.224</h2.version>
 
+    <!-- RESTful service dependencies -->
+    <swagger.version>2.2.1</swagger.version>
+    <swagger-ui.version>4.9.1</swagger-ui.version>
+    <jersey.version>2.39.1</jersey.version>
+    <jetty.version>9.4.52.v20230823</jetty.version>
+    <jakarta.servlet-api.version>4.0.4</jakarta.servlet-api.version>
+
     <shading.prefix>org.apache.celeborn.shaded</shading.prefix>
 
     <maven.plugin.antrun.version>3.0.0</maven.plugin.antrun.version>
@@ -355,11 +361,6 @@
         <artifactId>protobuf-java</artifactId>
         <version>${protobuf.version}</version>
       </dependency>
-      <dependency>
-        <groupId>javax.servlet</groupId>
-        <artifactId>javax.servlet-api</artifactId>
-        <version>${javaxservlet.version}</version>
-      </dependency>
       <dependency>
         <groupId>io.netty</groupId>
         <artifactId>netty-all</artifactId>
@@ -457,6 +458,105 @@
         <scope>test</scope>
       </dependency>
 
+      <!-- RESTful service dependencies -->
+      <dependency>
+        <groupId>jakarta.servlet</groupId>
+        <artifactId>jakarta.servlet-api</artifactId>
+        <version>${jakarta.servlet-api.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.glassfish.jersey.core</groupId>
+        <artifactId>jersey-server</artifactId>
+        <version>${jersey.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>jakarta.xml.bind</groupId>
+            <artifactId>jakarta.xml.bind-api</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+
+      <dependency>
+        <groupId>org.glassfish.jersey.containers</groupId>
+        <artifactId>jersey-container-servlet-core</artifactId>
+        <version>${jersey.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.glassfish.jersey.inject</groupId>
+        <artifactId>jersey-hk2</artifactId>
+        <version>${jersey.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.glassfish.jersey.media</groupId>
+        <artifactId>jersey-media-json-jackson</artifactId>
+        <version>${jersey.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.glassfish.jersey.media</groupId>
+        <artifactId>jersey-media-multipart</artifactId>
+        <version>${jersey.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.glassfish.jersey.test-framework</groupId>
+        <artifactId>jersey-test-framework-core</artifactId>
+        <version>${jersey.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>jakarta.activation</groupId>
+            <artifactId>jakarta.activation-api</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+
+      <dependency>
+        <groupId>org.glassfish.jersey.test-framework.providers</groupId>
+        <artifactId>jersey-test-framework-provider-jetty</artifactId>
+        <version>${jersey.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-util</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-continuation</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+
+      <dependency>
+        <groupId>io.swagger.core.v3</groupId>
+        <artifactId>swagger-jaxrs2</artifactId>
+        <version>${swagger.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>com.sun.activation</groupId>
+            <artifactId>jakarta.activation</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.javassist</groupId>
+            <artifactId>javassist</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+
+      <!--
+      1. This library only contains swagger-ui static resource 
(.html/.css/.js/.png), for more detail, see
+      https://github.com/swagger-api/swagger-ui/blob/master/dist/
+      2. Note that when trying to upgrade swagger-ui, we should also update 
the version in the file(
+      
service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala).
+      -->
+      <dependency>
+        <groupId>org.webjars</groupId>
+        <artifactId>swagger-ui</artifactId>
+        <version>${swagger-ui.version}</version>
+      </dependency>
+
       <dependency>
         <groupId>junit</groupId>
         <artifactId>junit</artifactId>
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 0cd12ac3e..906ca112b 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -45,7 +45,6 @@ object Dependencies {
   val findbugsVersion = "1.3.9"
   val guavaVersion = "32.1.3-jre"
   val hadoopVersion = "3.3.6"
-  val javaxServletVersion = "3.1.0"
   val junitInterfaceVersion = "0.13.3"
   // don't forget update `junitInterfaceVersion` when we upgrade junit
   val junitVersion = "4.13.2"
@@ -67,6 +66,11 @@ object Dependencies {
   val mybatisVersion = "3.5.15"
   val hikaricpVersion = "4.0.3"
   val h2Version = "2.2.224"
+  val swaggerVersion = "2.2.1"
+  val swaggerUiVersion = "4.9.1"
+  val jerseyVersion = "2.39.1"
+  val jettyVersion = "9.4.52.v20230823"
+  val jakartaServeletApiVersion = "4.0.4"
 
   // For SSL support
   val bouncycastleVersion = "1.77"
@@ -105,7 +109,6 @@ object Dependencies {
   val ioDropwizardMetricsJvm = "io.dropwizard.metrics" % "metrics-jvm" % 
metricsVersion
   val ioNetty = "io.netty" % "netty-all" % nettyVersion excludeAll(
     ExclusionRule("io.netty", "netty-handler-ssl-ocsp"))
-  val javaxServletApi = "javax.servlet" % "javax.servlet-api" % 
javaxServletVersion
   val leveldbJniAll = "org.fusesource.leveldbjni" % "leveldbjni-all" % 
leveldbJniVersion
   val log4j12Api = "org.apache.logging.log4j" % "log4j-1.2-api" % log4j2Version
   val log4jSlf4jImpl = "org.apache.logging.log4j" % "log4j-slf4j-impl" % 
log4j2Version
@@ -133,6 +136,23 @@ object Dependencies {
   val zstdJni = "com.github.luben" % "zstd-jni" % zstdJniVersion
   val mybatis = "org.mybatis" % "mybatis" % mybatisVersion
   val hikaricp = "com.zaxxer" % "HikariCP" % hikaricpVersion
+  val jettyServer = "org.eclipse.jetty" % "jetty-server" % jettyVersion 
excludeAll(
+    ExclusionRule("javax.servlet", "javax.servlet-api"))
+  val jettyServlet = "org.eclipse.jetty" % "jetty-servlet" % jettyVersion 
excludeAll(
+    ExclusionRule("javax.servlet", "javax.servlet-api"))
+  val jettyProxy = "org.eclipse.jetty" % "jetty-proxy" % jettyVersion
+  val jakartaServletApi = "jakarta.servlet" % "jakarta.servlet-api" % 
jakartaServeletApiVersion
+  val jerseyServer = "org.glassfish.jersey.core" % "jersey-server" % 
jerseyVersion excludeAll(
+    ExclusionRule("jakarta.xml.bind", "jakarta.xml.bind-api"))
+  val jerseyContainerServletCore = "org.glassfish.jersey.containers" % 
"jersey-container-servlet-core" % jerseyVersion
+  val jerseyHk2 = "org.glassfish.jersey.inject" % "jersey-hk2" % jerseyVersion
+  val jerseyMediaJsonJackson = "org.glassfish.jersey.media" % 
"jersey-media-json-jackson" % jerseyVersion
+  val jerseyMediaMultipart = "org.glassfish.jersey.media" % 
"jersey-media-multipart" % jerseyVersion
+  val swaggerJaxrs2 = "io.swagger.core.v3" % "swagger-jaxrs2" %swaggerVersion 
excludeAll(
+    ExclusionRule("com.sun.activation", "jakarta.activation"),
+    ExclusionRule("org.javassist", "javassist"),
+    ExclusionRule("jakarta.activation", "jakarta.activation-api"))
+  val swaggerUi = "org.webjars" % "swagger-ui" % swaggerUiVersion
 
   // Test dependencies
   // https://www.scala-sbt.org/1.x/docs/Testing.html
@@ -143,6 +163,10 @@ object Dependencies {
   val scalatestMockito = "org.mockito" %% "mockito-scala-scalatest" % 
scalatestMockitoVersion
   val scalatest = "org.scalatest" %% "scalatest" % scalatestVersion
   val h2 = "com.h2database" % "h2" % h2Version
+  val jerseyTestFrameworkCore = "org.glassfish.jersey.test-framework" % 
"jersey-test-framework-core" % jerseyVersion
+  val jerseyTestFrameworkProviderJetty = 
"org.glassfish.jersey.test-framework.providers" % 
"jersey-test-framework-provider-jetty" % jerseyVersion excludeAll(
+    ExclusionRule("org.eclipse.jetty", "jetty-util"),
+    ExclusionRule("org.eclipse.jetty", "jetty-continuation"))
 
   // SSL support
   val bouncycastleBcprovJdk18on = "org.bouncycastle" % "bcprov-jdk18on" % 
bouncycastleVersion % "test"
@@ -458,21 +482,34 @@ object CelebornService {
         Dependencies.findbugsJsr305,
         Dependencies.commonsIo,
         Dependencies.ioNetty,
-        Dependencies.javaxServletApi,
         Dependencies.commonsCrypto,
         Dependencies.slf4jApi,
         Dependencies.mybatis,
         Dependencies.hikaricp,
+        Dependencies.swaggerJaxrs2,
+        Dependencies.swaggerUi,
+        Dependencies.jakartaServletApi,
+        Dependencies.jerseyServer,
+        Dependencies.jerseyContainerServletCore,
+        Dependencies.jerseyHk2,
+        Dependencies.jerseyMediaJsonJackson,
+        Dependencies.jerseyMediaMultipart,
+        Dependencies.jettyServer,
+        Dependencies.jettyServlet,
+        Dependencies.jettyProxy,
         Dependencies.log4jSlf4jImpl % "test",
         Dependencies.log4j12Api % "test",
-        Dependencies.h2 % "test"
+        Dependencies.h2 % "test",
+        Dependencies.jerseyTestFrameworkCore % "test",
+        Dependencies.jerseyTestFrameworkProviderJetty % "test"
       ) ++ commonUnitTestDependencies
     )
 }
 
 object CelebornMaster {
   lazy val master = Project("celeborn-master", file("master"))
-    .dependsOn(CelebornCommon.common, CelebornService.service)
+    .dependsOn(CelebornCommon.common)
+    .dependsOn(CelebornService.service % "test->test;compile->compile")
     .settings (
       commonSettings,
       protoSettings,
@@ -497,6 +534,7 @@ object CelebornWorker {
   lazy val worker = Project("celeborn-worker", file("worker"))
     .dependsOn(CelebornService.service)
     .dependsOn(CelebornCommon.common % "test->test;compile->compile")
+    .dependsOn(CelebornService.service % "test->test;compile->compile")
     .dependsOn(CelebornClient.client % "test->compile")
     .dependsOn(CelebornMaster.master % "test->compile")
     .settings (
@@ -516,7 +554,9 @@ object CelebornWorker {
         Dependencies.leveldbJniAll,
         Dependencies.roaringBitmap,
         Dependencies.rocksdbJni,
-        Dependencies.scalatestMockito % "test"
+        Dependencies.scalatestMockito % "test",
+        Dependencies.jerseyTestFrameworkCore % "test",
+        Dependencies.jerseyTestFrameworkProviderJetty % "test"
       ) ++ commonUnitTestDependencies
     )
 }
@@ -724,7 +764,9 @@ trait SparkClientProjects {
         libraryDependencies ++= Seq(
           "org.apache.spark" %% "spark-core" % sparkVersion % "test",
           "org.apache.spark" %% "spark-sql" % sparkVersion % "test",
-          "org.apache.spark" %% "spark-core" % sparkVersion % "test" 
classifier "tests",
+          "org.apache.spark" %% "spark-core" % sparkVersion % "test" 
classifier "tests" excludeAll(
+            ExclusionRule("org.glassfish.jersey.inject", "*"),
+            ExclusionRule("org.glassfish.jersey.core", "*")),
           "org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier 
"tests"
         ) ++ commonUnitTestDependencies
       )
diff --git a/service/pom.xml b/service/pom.xml
index faf80a142..b6d0da739 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -29,6 +29,36 @@
   <packaging>jar</packaging>
   <name>Celeborn Service</name>
 
+  <dependencyManagement>
+    <dependencies>
+      <!-- RESTful service dependencies, place here to prevent impacting sbt 
mr dependencies -->
+      <dependency>
+        <groupId>org.eclipse.jetty</groupId>
+        <artifactId>jetty-server</artifactId>
+        <version>${jetty.version}</version>
+        <exclusions>
+          <!--
+            Use `jakarta.servlet-api` instead.
+            -->
+          <exclusion>
+            <groupId>javax.servlet</groupId>
+            <artifactId>javax.servlet-api</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
+        <groupId>org.eclipse.jetty</groupId>
+        <artifactId>jetty-servlet</artifactId>
+        <version>${jetty.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.eclipse.jetty</groupId>
+        <artifactId>jetty-proxy</artifactId>
+        <version>${jetty.version}</version>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+
   <dependencies>
     <dependency>
       <groupId>org.apache.celeborn</groupId>
@@ -43,10 +73,6 @@
       <groupId>io.netty</groupId>
       <artifactId>netty-all</artifactId>
     </dependency>
-    <dependency>
-      <groupId>javax.servlet</groupId>
-      <artifactId>javax.servlet-api</artifactId>
-    </dependency>
     <dependency>
       <groupId>commons-io</groupId>
       <artifactId>commons-io</artifactId>
@@ -75,6 +101,62 @@
       <scope>test</scope>
     </dependency>
 
+    <!-- RESTful dependencies -->
+    <dependency>
+      <groupId>io.swagger.core.v3</groupId>
+      <artifactId>swagger-jaxrs2</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.webjars</groupId>
+      <artifactId>swagger-ui</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>jakarta.servlet</groupId>
+      <artifactId>jakarta.servlet-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.glassfish.jersey.core</groupId>
+      <artifactId>jersey-server</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.glassfish.jersey.containers</groupId>
+      <artifactId>jersey-container-servlet-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.glassfish.jersey.inject</groupId>
+      <artifactId>jersey-hk2</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.glassfish.jersey.media</groupId>
+      <artifactId>jersey-media-json-jackson</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.glassfish.jersey.media</groupId>
+      <artifactId>jersey-media-multipart</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-server</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-servlet</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-proxy</artifactId>
+    </dependency>
+
     <!-- Test dependencies -->
     <dependency>
       <groupId>org.mockito</groupId>
@@ -91,5 +173,15 @@
       <artifactId>log4j-1.2-api</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.glassfish.jersey.test-framework</groupId>
+      <artifactId>jersey-test-framework-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.glassfish.jersey.test-framework.providers</groupId>
+      <artifactId>jersey-test-framework-provider-jetty</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git a/service/src/main/resources/org/apache/celeborn/swagger/index.html 
b/service/src/main/resources/org/apache/celeborn/swagger/index.html
new file mode 100644
index 000000000..2e0c4c687
--- /dev/null
+++ b/service/src/main/resources/org/apache/celeborn/swagger/index.html
@@ -0,0 +1,73 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE html>
+<html lang="en">
+  <head>
+    <meta charset="UTF-8">
+    <title>Apache Celeborn(Incubating) REST API Documentation</title>
+        <link rel="stylesheet" type="text/css" 
href="../swagger-static/swagger-ui.css" />
+        <link rel="icon" type="image/png" 
href="../swagger-static/favicon-32x32.png" sizes="32x32" />
+        <link rel="icon" type="image/png" 
href="../swagger-static/favicon-16x16.png" sizes="16x16" />
+    <style>
+      html
+      {
+        box-sizing: border-box;
+        overflow: -moz-scrollbars-vertical;
+        overflow-y: scroll;
+      }
+
+      *,
+      *:before,
+      *:after
+      {
+        box-sizing: inherit;
+      }
+
+      body
+      {
+        margin:0;
+        background: #fafafa;
+      }
+    </style>
+  </head>
+
+               <div id="swagger-ui"></div>
+
+    <script src="../swagger-static/swagger-ui-bundle.js" charset="UTF-8"> 
</script>
+    <script src="../swagger-static/swagger-ui-standalone-preset.js" 
charset="UTF-8"> </script>
+    <script>
+    window.onload = function() {
+      // Begin Swagger UI call region
+      window.ui = SwaggerUIBundle({
+                               url: location.origin + "/openapi.json",
+                               dom_id: '#swagger-ui',
+                               deepLinking: true,
+                               presets: [
+                                       SwaggerUIBundle.presets.apis,
+                                       SwaggerUIStandalonePreset
+                               ],
+                               plugins: [
+                                       SwaggerUIBundle.plugins.DownloadUrl
+                               ],
+                               layout: "StandaloneLayout"
+                       });
+                       // End Swagger UI call region
+               };
+  </script>
+  </body>
+</html>
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala 
b/service/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
similarity index 96%
rename from 
common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
rename to 
service/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
index 82311bea9..ab919bb14 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
+++ 
b/service/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
@@ -25,11 +25,12 @@ import scala.collection.mutable.ArrayBuffer
 import scala.util.matching.Regex
 
 import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry}
+import org.eclipse.jetty.servlet.ServletContextHandler
 
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.CelebornConf.{METRICS_JSON_PATH, 
METRICS_PROMETHEUS_PATH}
 import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.common.metrics.sink.{JsonServlet, 
PrometheusServlet, ServletHttpRequestHandler, Sink}
+import org.apache.celeborn.common.metrics.sink.{JsonServlet, 
PrometheusServlet, Sink}
 import org.apache.celeborn.common.metrics.source.Source
 import org.apache.celeborn.common.util.Utils
 
@@ -51,7 +52,7 @@ class MetricsSystem(
 
   metricsConfig.initialize()
 
-  def getServletHandlers: Array[ServletHttpRequestHandler] = {
+  def getServletContextHandlers: Array[ServletContextHandler] = {
     require(running, "Can only call getServletHandlers on a running 
MetricsSystem")
     prometheusServlet.map(_.getHandlers(conf)).getOrElse(Array()) ++
       jsonServlet.map(_.getHandlers(conf)).getOrElse(Array())
@@ -139,7 +140,7 @@ class MetricsSystem(
             prometheusServlet = Some(servlet.newInstance(
               kv._2,
               registry,
-              sources.asScala,
+              sources.asScala.toSeq,
               prometheusServletPath).asInstanceOf[PrometheusServlet])
           } else if (kv._1 == "jsonServlet") {
             val servlet = Utils.classForName(classPath)
@@ -152,7 +153,7 @@ class MetricsSystem(
             jsonServlet = Some(servlet.newInstance(
               kv._2,
               registry,
-              sources.asScala,
+              sources.asScala.toSeq,
               jsonServletPath,
               
conf.metricsJsonPrettyEnabled.asInstanceOf[Object]).asInstanceOf[JsonServlet])
           } else {
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/AbstractServlet.scala
 
b/service/src/main/scala/org/apache/celeborn/common/metrics/sink/AbstractServlet.scala
similarity index 77%
rename from 
common/src/main/scala/org/apache/celeborn/common/metrics/sink/AbstractServlet.scala
rename to 
service/src/main/scala/org/apache/celeborn/common/metrics/sink/AbstractServlet.scala
index 325826164..9b9ba7e12 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/AbstractServlet.scala
+++ 
b/service/src/main/scala/org/apache/celeborn/common/metrics/sink/AbstractServlet.scala
@@ -16,17 +16,19 @@
  */
 package org.apache.celeborn.common.metrics.sink
 
+import org.eclipse.jetty.servlet.ServletContextHandler
+
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.metrics.source.Source
 
 abstract class AbstractServlet(sources: Seq[Source]) extends Sink with Logging 
{
-  def getHandlers(conf: CelebornConf): Array[ServletHttpRequestHandler] = {
-    Array[ServletHttpRequestHandler](
-      createHttpRequestHandler())
+  def getHandlers(conf: CelebornConf): Array[ServletContextHandler] = {
+    Array[ServletContextHandler](
+      createServletHandler())
   }
 
-  def createHttpRequestHandler(): ServletHttpRequestHandler
+  def createServletHandler(): ServletContextHandler
 
   def getMetricsSnapshot: String = {
     sources.map(_.getMetrics).mkString
@@ -38,11 +40,3 @@ abstract class AbstractServlet(sources: Seq[Source]) extends 
Sink with Logging {
 
   override def report(): Unit = {}
 }
-
-abstract class ServletHttpRequestHandler(path: String) extends Logging {
-
-  def handleRequest(uri: String): String
-
-  def getServletPath(): String = path
-
-}
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/JsonServlet.scala
 
b/service/src/main/scala/org/apache/celeborn/common/metrics/sink/JsonServlet.scala
similarity index 95%
rename from 
common/src/main/scala/org/apache/celeborn/common/metrics/sink/JsonServlet.scala
rename to 
service/src/main/scala/org/apache/celeborn/common/metrics/sink/JsonServlet.scala
index 7a2b8b52c..4ed2c5a3a 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/JsonServlet.scala
+++ 
b/service/src/main/scala/org/apache/celeborn/common/metrics/sink/JsonServlet.scala
@@ -24,10 +24,12 @@ import scala.collection.mutable.ArrayBuffer
 import com.codahale.metrics.MetricRegistry
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.fasterxml.jackson.module.scala.{ClassTagExtensions, 
DefaultScalaModule}
-import io.netty.channel.ChannelHandler.Sharable
+import org.eclipse.jetty.servlet.ServletContextHandler
 
 import org.apache.celeborn.common.metrics.{CelebornHistogram, CelebornTimer, 
ResettableSlidingWindowReservoir}
-import org.apache.celeborn.common.metrics.source.{AbstractSource, 
NamedCounter, NamedGauge, NamedHistogram, NamedTimer, Source}
+import org.apache.celeborn.common.metrics.source._
+import org.apache.celeborn.server.common.http.HttpUtils
+import org.apache.celeborn.server.common.http.HttpUtils.ServletParams
 
 object JsonConverter {
   val mapper = new ObjectMapper() with ClassTagExtensions
@@ -72,8 +74,10 @@ class JsonServlet(
     }
   }
 
-  override def createHttpRequestHandler(): ServletHttpRequestHandler = {
-    new JsonHttpRequestHandler(servletPath, this)
+  override def createServletHandler(): ServletContextHandler = {
+    HttpUtils.createServletHandler(
+      servletPath,
+      new ServletParams(_ => getMetricsSnapshot, "text/json"))
   }
 
   override def stop(): Unit = {}
@@ -344,12 +348,3 @@ class JsonServlet(
     }
   }
 }
-
-@Sharable
-class JsonHttpRequestHandler(path: String, jsonServlet: JsonServlet)
-  extends ServletHttpRequestHandler(path) {
-
-  override def handleRequest(uri: String): String = {
-    jsonServlet.getMetricsSnapshot
-  }
-}
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala
 
b/service/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala
similarity index 73%
rename from 
common/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala
rename to 
service/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala
index 4b4548421..27797c8fc 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala
+++ 
b/service/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala
@@ -20,9 +20,11 @@ package org.apache.celeborn.common.metrics.sink
 import java.util.Properties
 
 import com.codahale.metrics.MetricRegistry
-import io.netty.channel.ChannelHandler.Sharable
+import org.eclipse.jetty.servlet.ServletContextHandler
 
 import org.apache.celeborn.common.metrics.source.Source
+import org.apache.celeborn.server.common.http.HttpUtils
+import org.apache.celeborn.server.common.http.HttpUtils.ServletParams
 
 class PrometheusServlet(
     val property: Properties,
@@ -30,17 +32,9 @@ class PrometheusServlet(
     val sources: Seq[Source],
     val servletPath: String) extends AbstractServlet(sources) {
 
-  override def createHttpRequestHandler(): ServletHttpRequestHandler = {
-    new PrometheusHttpRequestHandler(servletPath, this)
-  }
-}
-
-@Sharable
-class PrometheusHttpRequestHandler(
-    path: String,
-    prometheusServlet: PrometheusServlet) extends 
ServletHttpRequestHandler(path) {
-
-  override def handleRequest(uri: String): String = {
-    prometheusServlet.getMetricsSnapshot
+  override def createServletHandler(): ServletContextHandler = {
+    HttpUtils.createServletHandler(
+      servletPath,
+      new ServletParams(_ => getMetricsSnapshot, "text/plain"))
   }
 }
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala 
b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
index f13752b81..18e016187 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
@@ -23,7 +23,8 @@ import scala.collection.JavaConverters._
 
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.server.common.http.{HttpRequestHandler, HttpServer, 
HttpServerInitializer}
+import org.apache.celeborn.server.common.http.HttpServer
+import org.apache.celeborn.server.common.http.api.ApiRootResource
 import org.apache.celeborn.server.common.service.config.ConfigLevel
 
 abstract class HttpService extends Service with Logging {
@@ -173,18 +174,20 @@ abstract class HttpService extends Service with Logging {
   def getWorkerEventInfo(): String = throw new UnsupportedOperationException()
 
   def startHttpServer(): Unit = {
-    val handlers =
-      if (metricsSystem.running) {
-        new HttpRequestHandler(this, metricsSystem.getServletHandlers)
-      } else {
-        new HttpRequestHandler(this, null)
-      }
-    httpServer = new HttpServer(
+    httpServer = HttpServer(
       serviceName,
       httpHost(),
       httpPort(),
-      new HttpServerInitializer(handlers))
+      httpMaxWorkerThreads(),
+      httpStopTimeout())
     httpServer.start()
+    startInternal()
+    // block until the HTTP server is started, otherwise, we may get
+    // the wrong HTTP server port -1
+    while (httpServer.getState != "STARTED") {
+      logInfo(s"Waiting for $serviceName's HTTP server getting started")
+      Thread.sleep(1000)
+    }
   }
 
   private def httpHost(): String = {
@@ -205,6 +208,41 @@ abstract class HttpService extends Service with Logging {
     }
   }
 
+  private def httpMaxWorkerThreads(): Int = {
+    serviceName match {
+      case Service.MASTER =>
+        conf.masterHttpMaxWorkerThreads
+      case Service.WORKER =>
+        conf.workerHttpMaxWorkerThreads
+    }
+  }
+
+  private def httpStopTimeout(): Long = {
+    serviceName match {
+      case Service.MASTER =>
+        conf.masterHttpStopTimeout
+      case Service.WORKER =>
+        conf.workerHttpStopTimeout
+    }
+  }
+
+  def connectionUrl: String = {
+    httpServer.getServerUri
+  }
+
+  protected def startInternal(): Unit = {
+    httpServer.addHandler(ApiRootResource.getServletHandler(this))
+    
httpServer.addStaticHandler("META-INF/resources/webjars/swagger-ui/4.9.1/", 
"/swagger-static/")
+    httpServer.addStaticHandler("org/apache/celeborn/swagger", "/swagger")
+    httpServer.addRedirectHandler("/help", "/swagger")
+    httpServer.addRedirectHandler("/docs", "/swagger")
+    if (metricsSystem.running) {
+      metricsSystem.getServletContextHandlers.foreach { handler =>
+        httpServer.addHandler(handler)
+      }
+    }
+  }
+
   override def initialize(): Unit = {
     super.initialize()
     startHttpServer()
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala
 
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala
deleted file mode 100644
index 778355873..000000000
--- 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.celeborn.server.common.http
-
-import org.apache.celeborn.server.common.{HttpService, Service}
-import org.apache.celeborn.server.common.service.config.ConfigLevel
-
-/**
- * HTTP endpoints of Rest API providers.
- */
-trait HttpEndpoint {
-  def path: String
-
-  def description(service: String): String
-
-  def handle(service: HttpService, parameters: Map[String, String]): String
-}
-
-case object Conf extends HttpEndpoint {
-  override def path: String = "/conf"
-
-  override def description(service: String): String = s"List the conf setting 
of the $service."
-
-  override def handle(service: HttpService, parameters: Map[String, String]): 
String =
-    service.getConf
-}
-
-case object ListDynamicConfigs extends HttpEndpoint {
-  override def path: String = "/listDynamicConfigs"
-
-  override def description(service: String): String = s"List the dynamic 
configs of the $service. " +
-    s"The parameter level specifies the config level of dynamic configs. " +
-    s"The parameter tenant specifies the tenant id of 
${ConfigLevel.TENANT.name()} or ${ConfigLevel.TENANT_USER.name()} level. " +
-    s"The parameter name specifies the user name of 
${ConfigLevel.TENANT_USER.name()} level. " +
-    s"Meanwhile, either none or all of the parameter tenant and name are 
specified for ${ConfigLevel.TENANT_USER.name()} level."
-
-  override def handle(service: HttpService, parameters: Map[String, String]): 
String =
-    service.getDynamicConfigs(
-      parameters.getOrElse("LEVEL", "").trim,
-      parameters.getOrElse("TENANT", "").trim,
-      parameters.getOrElse("NAME", "").trim)
-}
-
-case object WorkerInfo extends HttpEndpoint {
-  override def path: String = "/workerInfo"
-
-  override def description(service: String): String = {
-    if (service == Service.MASTER)
-      "List worker information of the service. It will list all registered 
workers 's information."
-    else "List the worker information of the worker."
-  }
-
-  override def handle(service: HttpService, parameters: Map[String, String]): 
String =
-    service.getWorkerInfo
-}
-
-case object ThreadDump extends HttpEndpoint {
-  override def path: String = "/threadDump"
-
-  override def description(service: String): String =
-    s"List the current thread dump of the $service."
-
-  override def handle(service: HttpService, parameters: Map[String, String]): 
String =
-    service.getThreadDump
-}
-
-case object Shuffles extends HttpEndpoint {
-  override def path: String = "/shuffles"
-
-  override def description(service: String): String = {
-    if (service == Service.MASTER)
-      "List all running shuffle keys of the service. It will return all 
running shuffle's key of the cluster."
-    else
-      "List all the running shuffle keys of the worker. It only return keys of 
shuffles running in that worker."
-  }
-
-  override def handle(service: HttpService, parameters: Map[String, String]): 
String =
-    service.getShuffleList
-}
-
-case object Applications extends HttpEndpoint {
-  override def path: String = "/applications"
-
-  override def description(service: String): String =
-    if (service == Service.MASTER)
-      "List all running application's ids of the cluster."
-    else
-      "List all running application's ids of the worker. It only return 
application ids running in that worker."
-
-  override def handle(service: HttpService, parameters: Map[String, String]): 
String =
-    service.getApplicationList
-}
-
-case object ListTopDiskUsedApps extends HttpEndpoint {
-  override def path: String = "/listTopDiskUsedApps"
-
-  override def description(service: String): String = {
-    if (service == Service.MASTER)
-      "List the top disk usage application ids. It will return the top disk 
usage application ids for the cluster."
-    else
-      "List the top disk usage application ids. It only return application ids 
running in that worker."
-  }
-
-  override def handle(service: HttpService, parameters: Map[String, String]): 
String =
-    service.listTopDiskUseApps
-}
-
-case object Help extends HttpEndpoint {
-  override def path: String = "/help"
-
-  override def description(service: String): String =
-    s"List the available API providers of the $service."
-
-  override def handle(service: HttpService, parameters: Map[String, String]): 
String =
-    HttpUtils.help(service.serviceName)
-}
-
-case object Invalid extends HttpEndpoint {
-
-  val invalid = "invalid"
-
-  override def path: String = None.toString
-
-  override def description(service: String): String = s"Invalid uri of the 
$service."
-
-  override def handle(service: HttpService, parameters: Map[String, String]): 
String = invalid
-}
-
-case object MasterGroupInfo extends HttpEndpoint {
-  override def path: String = "/masterGroupInfo"
-
-  override def description(service: String): String =
-    "List master group information of the service. It will list all master's 
LEADER, FOLLOWER information."
-
-  override def handle(service: HttpService, parameters: Map[String, String]): 
String =
-    service.getMasterGroupInfo
-}
-
-case object LostWorkers extends HttpEndpoint {
-  override def path: String = "/lostWorkers"
-
-  override def description(service: String): String = "List all lost workers 
of the master."
-
-  override def handle(service: HttpService, parameters: Map[String, String]): 
String =
-    service.getLostWorkers
-}
-
-case object ExcludedWorkers extends HttpEndpoint {
-  override def path: String = "/excludedWorkers"
-
-  override def description(service: String): String = "List all excluded 
workers of the master."
-
-  override def handle(service: HttpService, parameters: Map[String, String]): 
String =
-    service.getExcludedWorkers
-}
-
-case object ShutdownWorkers extends HttpEndpoint {
-  override def path: String = "/shutdownWorkers"
-
-  override def description(service: String): String = "List all shutdown 
workers of the master."
-
-  override def handle(service: HttpService, parameters: Map[String, String]): 
String =
-    service.getShutdownWorkers
-}
-
-case object Hostnames extends HttpEndpoint {
-  override def path: String = "/hostnames"
-
-  override def description(service: String): String =
-    "List all running application's LifecycleManager's hostnames of the 
cluster."
-
-  override def handle(service: HttpService, parameters: Map[String, String]): 
String =
-    service.getHostnameList
-}
-
-case object Exclude extends HttpEndpoint {
-  override def path: String = "/exclude"
-
-  override def description(service: String): String =
-    "Excluded workers of the master add or remove the worker manually given 
worker id. The parameter add or remove specifies the excluded workers to add or 
remove, which value is separated by commas."
-
-  override def handle(service: HttpService, parameters: Map[String, String]): 
String =
-    service.exclude(parameters.getOrElse("ADD", "").trim, 
parameters.getOrElse("REMOVE", "").trim)
-}
-
-case object ListPartitionLocationInfo extends HttpEndpoint {
-  override def path: String = "/listPartitionLocationInfo"
-
-  override def description(service: String): String =
-    "List all the living PartitionLocation information in that worker."
-
-  override def handle(service: HttpService, parameters: Map[String, String]): 
String =
-    service.listPartitionLocationInfo
-}
-
-case object UnavailablePeers extends HttpEndpoint {
-  override def path: String = "/unavailablePeers"
-
-  override def description(service: String): String =
-    "List the unavailable peers of the worker, this always means the worker 
connect to the peer failed."
-
-  override def handle(service: HttpService, parameters: Map[String, String]): 
String =
-    service.getUnavailablePeers
-}
-
-case object IsShutdown extends HttpEndpoint {
-  override def path: String = "/isShutdown"
-
-  override def description(service: String): String =
-    "Show if the worker is during the process of shutdown."
-
-  override def handle(service: HttpService, parameters: Map[String, String]): 
String =
-    service.isShutdown
-}
-
-case object IsRegistered extends HttpEndpoint {
-  override def path: String = "/isRegistered"
-
-  override def description(service: String): String =
-    "Show if the worker is registered to the master success."
-
-  override def handle(service: HttpService, parameters: Map[String, String]): 
String =
-    service.isRegistered
-}
-
-case object Exit extends HttpEndpoint {
-  override def path: String = "/exit"
-
-  override def description(service: String): String =
-    "Trigger this worker to exit. Legal types are 'DECOMMISSION', 'GRACEFUL' 
and 'IMMEDIATELY'."
-
-  override def handle(service: HttpService, parameters: Map[String, String]): 
String =
-    service.exit(parameters.getOrElse("TYPE", ""))
-}
-
-case object SendWorkerEvent extends HttpEndpoint {
-  override def path: String = "/sendWorkerEvent"
-
-  override def description(service: String): String =
-    "For Master(Leader) can send worker event to manager workers. Legal types 
are 'None', 'Immediately', 'Decommission', 'DecommissionThenIdle', 'Graceful', 
'Recommission'"
-
-  override def handle(service: HttpService, parameters: Map[String, String]): 
String =
-    service.handleWorkerEvent(parameters.getOrElse("TYPE", ""), 
parameters.getOrElse("WORKERS", ""))
-}
-
-case object WorkerEventInfo extends HttpEndpoint {
-  override def path: String = "/workerEventInfo"
-
-  override def description(service: String): String =
-    "List all worker event infos of the master."
-
-  override def handle(service: HttpService, parameters: Map[String, String]): 
String =
-    service.getWorkerEventInfo()
-}
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
 
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
deleted file mode 100644
index 531f19454..000000000
--- 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.celeborn.server.common.http
-
-import io.netty.buffer.Unpooled
-import io.netty.channel.{ChannelFutureListener, ChannelHandlerContext, 
SimpleChannelInboundHandler}
-import io.netty.channel.ChannelHandler.Sharable
-import io.netty.handler.codec.http._
-import io.netty.util.CharsetUtil
-
-import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.common.metrics.sink.{JsonHttpRequestHandler, 
ServletHttpRequestHandler}
-import org.apache.celeborn.server.common.HttpService
-
-/**
- * A handler for the REST API that defines how to handle the HTTP request 
given a message.
- *
- * @param service The service of HTTP server.
- * @param uri The uri of HTTP request.
- */
-@Sharable
-class HttpRequestHandler(
-    service: HttpService,
-    servletHttpRequestHandlers: Array[ServletHttpRequestHandler])
-  extends SimpleChannelInboundHandler[FullHttpRequest] with Logging {
-
-  override def channelReadComplete(ctx: ChannelHandlerContext): Unit = {
-    ctx.flush()
-  }
-
-  override def channelRead0(ctx: ChannelHandlerContext, req: FullHttpRequest): 
Unit = {
-    val uri = req.uri()
-    val (path, parameters) = HttpUtils.parseUri(uri)
-    val msg = HttpUtils.handleRequest(service, path, parameters)
-    val textType = "text/plain; charset=UTF-8"
-    val jsonType = "application/json"
-    val (response, contentType) = msg match {
-      case Invalid.invalid =>
-        if (servletHttpRequestHandlers != null) {
-          servletHttpRequestHandlers.find(servlet =>
-            uri == servlet.getServletPath()).map {
-            case jsonHandler: JsonHttpRequestHandler =>
-              (jsonHandler.handleRequest(uri), jsonType)
-            case handler: ServletHttpRequestHandler =>
-              (handler.handleRequest(uri), textType)
-          }.getOrElse((s"Unknown path $uri!", textType))
-        } else {
-          (
-            s"${Invalid.description(service.serviceName)} 
${HttpUtils.help(service.serviceName)}",
-            textType)
-        }
-      case _ => (msg, textType)
-    }
-
-    val res = new DefaultFullHttpResponse(
-      HttpVersion.HTTP_1_1,
-      HttpResponseStatus.OK,
-      Unpooled.copiedBuffer(response, CharsetUtil.UTF_8))
-    res.headers().set(HttpHeaderNames.CONTENT_TYPE, contentType)
-    ctx.writeAndFlush(res).addListener(ChannelFutureListener.CLOSE)
-  }
-}
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
 
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
index a45ff109a..336132576 100644
--- 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
+++ 
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
@@ -17,75 +17,108 @@
 
 package org.apache.celeborn.server.common.http
 
-import java.net.InetSocketAddress
-import java.util.concurrent.TimeUnit
-
-import io.netty.bootstrap.ServerBootstrap
-import io.netty.channel.{ChannelFuture, ChannelInitializer}
-import io.netty.channel.socket.nio.NioServerSocketChannel
-import io.netty.handler.logging.{LoggingHandler, LogLevel}
+import org.apache.commons.lang3.SystemUtils
+import org.eclipse.jetty.server.{Handler, HttpConfiguration, 
HttpConnectionFactory, Server, ServerConnector}
+import org.eclipse.jetty.server.handler.{ContextHandlerCollection, 
ErrorHandler}
+import org.eclipse.jetty.util.component.LifeCycle
+import org.eclipse.jetty.util.thread.{QueuedThreadPool, 
ScheduledExecutorScheduler}
 
 import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.common.network.util.{IOMode, NettyUtils}
-import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
+import org.apache.celeborn.common.util.CelebornExitKind
 
-class HttpServer(
+private[celeborn] case class HttpServer(
     role: String,
-    host: String,
-    port: Int,
-    channelInitializer: ChannelInitializer[_]) extends Logging {
+    server: Server,
+    connector: ServerConnector,
+    rootHandler: ContextHandlerCollection) extends Logging {
 
-  private var bootstrap: ServerBootstrap = _
-  private var bindFuture: ChannelFuture = _
   @volatile private var isStarted = false
 
   @throws[Exception]
   def start(): Unit = synchronized {
-    val boss = NettyUtils.createEventLoop(IOMode.NIO, 1, role + "-http-boss")
-    val worker = NettyUtils.createEventLoop(IOMode.NIO, 2, role + 
"-http-worker")
-    bootstrap = new ServerBootstrap
-    bootstrap
-      .group(boss, worker)
-      .handler(new LoggingHandler(LogLevel.DEBUG))
-      .channel(classOf[NioServerSocketChannel])
-      .childHandler(channelInitializer)
-
-    val address = new InetSocketAddress(host, port)
-    bindFuture = bootstrap.bind(address).sync
-    bindFuture.syncUninterruptibly()
-    logInfo(s"$role: HttpServer started on ${address.getHostString}:$port.")
-    isStarted = true
+    try {
+      server.start()
+      connector.start()
+      server.addConnector(connector)
+      logInfo(s"$role: HttpServer started on 
${connector.getHost}:${connector.getPort}.")
+      isStarted = true
+    } catch {
+      case e: Exception =>
+        stop(CelebornExitKind.EXIT_IMMEDIATELY)
+        throw e
+    }
   }
 
   def stop(exitCode: Int): Unit = synchronized {
     if (isStarted) {
-      logInfo(s"$role: Stopping HttpServer")
-      if (bindFuture != null) {
-        // close is a local operation and should finish within milliseconds; 
timeout just to be safe
-        bindFuture.channel.close.awaitUninterruptibly(10, TimeUnit.SECONDS)
-        bindFuture = null
+      if (exitCode == CelebornExitKind.EXIT_IMMEDIATELY) {
+        server.setStopTimeout(0)
       }
-      if (bootstrap != null && bootstrap.config.group != null) {
-        Utils.tryLogNonFatalError {
-          if (exitCode == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
-            bootstrap.config.group.shutdownGracefully(3, 5, TimeUnit.SECONDS)
-          } else {
-            bootstrap.config.group.shutdownGracefully(0, 0, TimeUnit.SECONDS)
-          }
-        }
-      }
-      if (bootstrap != null && bootstrap.config.childGroup != null) {
-        Utils.tryLogNonFatalError {
-          if (exitCode == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
-            bootstrap.config.childGroup.shutdownGracefully(3, 5, 
TimeUnit.SECONDS)
-          } else {
-            bootstrap.config.childGroup.shutdownGracefully(0, 0, 
TimeUnit.SECONDS)
-          }
-        }
+      logInfo(s"$role: Stopping HttpServer")
+      server.stop()
+      connector.stop()
+      server.getThreadPool match {
+        case lifeCycle: LifeCycle => lifeCycle.stop()
+        case _ =>
       }
-      bootstrap = null
       logInfo(s"$role: HttpServer stopped.")
       isStarted = false
     }
   }
+  def getServerUri: String = connector.getHost + ":" + connector.getLocalPort
+
+  def addHandler(handler: Handler): Unit = synchronized {
+    rootHandler.addHandler(handler)
+    if (!handler.isStarted) handler.start()
+  }
+
+  def addStaticHandler(
+      resourceBase: String,
+      contextPath: String): Unit = {
+    addHandler(HttpUtils.createStaticHandler(resourceBase, contextPath))
+  }
+
+  def addRedirectHandler(
+      src: String,
+      dest: String): Unit = {
+    addHandler(HttpUtils.createRedirectHandler(src, dest))
+  }
+
+  def getState: String = server.getState
+}
+
+object HttpServer {
+
+  def apply(role: String, host: String, port: Int, poolSize: Int, stopTimeout: 
Long): HttpServer = {
+    val pool = new QueuedThreadPool(poolSize)
+    pool.setName(s"$role-JettyThreadPool")
+    pool.setDaemon(true)
+    val server = new Server(pool)
+    server.setStopTimeout(stopTimeout)
+
+    val errorHandler = new ErrorHandler()
+    errorHandler.setShowStacks(true)
+    errorHandler.setServer(server)
+    server.addBean(errorHandler)
+
+    val collection = new ContextHandlerCollection
+    server.setHandler(collection)
+
+    val serverExecutor = new 
ScheduledExecutorScheduler(s"$role-JettyScheduler", true)
+    val httpConf = new HttpConfiguration()
+    val connector = new ServerConnector(
+      server,
+      null,
+      serverExecutor,
+      null,
+      -1,
+      -1,
+      new HttpConnectionFactory(httpConf))
+    connector.setHost(host)
+    connector.setPort(port)
+    connector.setReuseAddress(!SystemUtils.IS_OS_WINDOWS)
+    connector.setAcceptQueueSize(math.min(connector.getAcceptors, 8))
+
+    new HttpServer(role, server, connector, collection)
+  }
 }
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala 
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala
index 4ccb127e5..b9b6a3ec2 100644
--- 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala
+++ 
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala
@@ -18,78 +18,111 @@
 package org.apache.celeborn.server.common.http
 
 import java.net.URL
-import java.util.Locale
-
-import org.apache.celeborn.server.common.{HttpService, Service}
-
-object HttpUtils {
-
-  private val baseEndpoints: List[HttpEndpoint] =
-    List(
-      Conf,
-      ListDynamicConfigs,
-      WorkerInfo,
-      ThreadDump,
-      Shuffles,
-      Applications,
-      ListTopDiskUsedApps,
-      Help)
-  private val masterEndpoints: List[HttpEndpoint] = List(
-    MasterGroupInfo,
-    LostWorkers,
-    ExcludedWorkers,
-    ShutdownWorkers,
-    Hostnames,
-    SendWorkerEvent,
-    WorkerEventInfo,
-    Exclude) ++ baseEndpoints
-  private val workerEndpoints: List[HttpEndpoint] =
-    List(
-      ListPartitionLocationInfo,
-      UnavailablePeers,
-      IsShutdown,
-      IsRegistered,
-      Exit) ++ baseEndpoints
-
-  def parseUri(uri: String): (String, Map[String, String]) = {
-    val url = new URL(s"https://127.0.0.1:9000$uri";)
-    val parameter =
-      if (url.getQuery == null) {
-        Map.empty[String, String]
-      } else {
-        url.getQuery
-          .split("&")
-          .map(_.split("="))
-          .map(arr => arr(0).toUpperCase(Locale.ROOT) -> arr(1)).toMap
+import javax.servlet.http.{HttpServlet, HttpServletRequest, 
HttpServletResponse}
+
+import org.eclipse.jetty.servlet.{DefaultServlet, ServletContextHandler, 
ServletHolder}
+
+import org.apache.celeborn.common.exception.CelebornException
+import org.apache.celeborn.common.internal.Logging
+
+private[celeborn] object HttpUtils extends Logging {
+  // Base type for a function that returns something based on an HTTP request. 
Allows for
+  // implicit conversion from many types of functions to jetty Handlers.
+  type Responder[T] = HttpServletRequest => T
+
+  class ServletParams[T <: AnyRef](
+      val responder: Responder[T],
+      val contentType: String,
+      val extractFn: T => String = (in: Any) => in.toString) {}
+
+  /** Create a context handler that responds to a request with the given path 
prefix */
+  def createServletHandler[T <: AnyRef](
+      path: String,
+      servletParams: ServletParams[T]): ServletContextHandler = {
+    createServletHandler(path, createServlet(servletParams))
+  }
+
+  private def createServlet[T <: AnyRef](servletParams: ServletParams[T]): 
HttpServlet = {
+    new HttpServlet {
+      override def doGet(request: HttpServletRequest, response: 
HttpServletResponse): Unit = {
+        try {
+          
response.setContentType("%s;charset=utf-8".format(servletParams.contentType))
+          response.setStatus(HttpServletResponse.SC_OK)
+          val result = servletParams.responder(request)
+          response.getWriter.print(servletParams.extractFn(result))
+        } catch {
+          case e: IllegalArgumentException =>
+            response.sendError(HttpServletResponse.SC_BAD_REQUEST, 
e.getMessage)
+          case e: Exception =>
+            logWarning(s"GET ${request.getRequestURI} failed: $e", e)
+            throw e
+        }
       }
-    (url.getPath, parameter)
+
+      override protected def doTrace(req: HttpServletRequest, res: 
HttpServletResponse): Unit = {
+        res.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED)
+      }
+    }
   }
 
-  def handleRequest(
-      service: HttpService,
-      path: String,
-      parameters: Map[String, String]): String = {
-    endpoints(service.serviceName).find(endpoint => endpoint.path == 
path).orElse(
-      Some(Invalid)).get.handle(
-      service,
-      parameters)
+  /**
+   * Create a handler for serving files from a static directory
+   *
+   * @param resourceBase the resource directory contains static resource files
+   * @param contextPath the content path to set for the handler
+   * @return a static [[ServletContextHandler]]
+   */
+  def createStaticHandler(
+      resourceBase: String,
+      contextPath: String): ServletContextHandler = {
+    val contextHandler = new ServletContextHandler()
+    val holder = new ServletHolder(classOf[DefaultServlet])
+    
Option(Thread.currentThread().getContextClassLoader.getResource(resourceBase)) 
match {
+      case Some(res) =>
+        holder.setInitParameter("resourceBase", res.toString)
+      case None =>
+        throw new CelebornException("Could not find resource path for Web UI: 
" + resourceBase)
+    }
+    contextHandler.setContextPath(contextPath)
+    contextHandler.addServlet(holder, "/")
+    contextHandler
   }
 
-  def help(service: String): String = {
-    val sb = new StringBuilder
-    sb.append("Available API providers include:\n")
-    val httpEndpoints: List[HttpEndpoint] = endpoints(service)
-    val maxLength = httpEndpoints.map(_.path.length).max
-    httpEndpoints.sortBy(_.path).foreach(endpoint =>
-      sb.append(
-        s"${endpoint.path.padTo(maxLength, " ").mkString} 
${endpoint.description(service)}\n"))
-    sb.toString
+  def createServletHandler(contextPath: String, servlet: HttpServlet): 
ServletContextHandler = {
+    val handler = new ServletContextHandler()
+    val holder = new ServletHolder(servlet)
+    handler.setContextPath(contextPath)
+    handler.addServlet(holder, "/")
+    handler
   }
 
-  private def endpoints(service: String): List[HttpEndpoint] = {
-    if (service == Service.MASTER)
-      masterEndpoints
-    else
-      workerEndpoints
+  def createRedirectHandler(src: String, dest: String): ServletContextHandler 
= {
+    val redirectedServlet = new HttpServlet {
+      private def doReq(req: HttpServletRequest, resp: HttpServletResponse): 
Unit = {
+        val newURL = new URL(new URL(req.getRequestURL.toString), 
dest).toString
+        resp.sendRedirect(newURL)
+      }
+      override def doGet(req: HttpServletRequest, resp: HttpServletResponse): 
Unit = {
+        doReq(req, resp)
+      }
+
+      override def doPut(req: HttpServletRequest, resp: HttpServletResponse): 
Unit = {
+        doReq(req, resp)
+      }
+
+      override def doPost(req: HttpServletRequest, resp: HttpServletResponse): 
Unit = {
+        doReq(req, resp)
+      }
+
+      override def doDelete(req: HttpServletRequest, resp: 
HttpServletResponse): Unit = {
+        doReq(req, resp)
+      }
+
+      override protected def doTrace(req: HttpServletRequest, resp: 
HttpServletResponse): Unit = {
+        resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED)
+      }
+    }
+
+    createServletHandler(src, redirectedServlet)
   }
 }
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiBaseResource.scala
 
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiBaseResource.scala
new file mode 100644
index 000000000..45a5d2796
--- /dev/null
+++ 
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiBaseResource.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http.api
+
+import javax.ws.rs.{GET, Path, Produces, QueryParam}
+import javax.ws.rs.core.MediaType
+
+import io.swagger.v3.oas.annotations.media.Content
+import io.swagger.v3.oas.annotations.responses.ApiResponse
+
+@Path("/")
+private[api] class ApiBaseResource extends ApiRequestContext {
+  def service: String = httpService.serviceName
+
+  @GET
+  @Path("ping")
+  @Produces(Array(MediaType.TEXT_PLAIN))
+  def ping(): String = "pong"
+
+  @Path("/conf")
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.TEXT_PLAIN)),
+    description = "List the conf setting.")
+  @GET
+  def conf: String = httpService.getConf
+
+  @Path("/listDynamicConfigs")
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.TEXT_PLAIN)),
+    description = "List the dynamic configs. " +
+      "The parameter level specifies the config level of dynamic configs. " +
+      "The parameter tenant specifies the tenant id of TENANT or TENANT_USER 
level. " +
+      "The parameter name specifies the user name of TENANT_USER level. " +
+      "Meanwhile, either none or all of the parameter tenant and name are 
specified for TENANT_USER level.")
+  @GET
+  def listDynamicConfigs(
+      @QueryParam("LEVEL") level: String,
+      @QueryParam("TENANT") tenant: String,
+      @QueryParam("NAME") name: String): String = {
+    httpService.getDynamicConfigs(
+      normalizeParam(level),
+      normalizeParam(tenant),
+      normalizeParam(name))
+  }
+
+  @Path("/workerInfo")
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.TEXT_PLAIN)),
+    description =
+      "For MASTER: List worker information of the service. It will list all 
registered workers 's information.\n" +
+        "For WORKER: List the worker information of the worker.")
+  @GET
+  def workerInfo(): String = {
+    httpService.getWorkerInfo
+  }
+
+  @Path("/threadDump")
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.TEXT_PLAIN)),
+    description = "List the current thread dump.")
+  @GET
+  def threadDump(): String = {
+    httpService.getThreadDump
+  }
+
+  @Path("shuffle")
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.TEXT_PLAIN)),
+    description =
+      "For MASTER: List all running shuffle keys of the service. It will 
return all running shuffle's key of the cluster.\n" +
+        "For WORKER: List all the running shuffle keys of the worker. It only 
return keys of shuffles running in that worker.")
+  @GET
+  def shuffles(): String = {
+    httpService.getShuffleList
+  }
+
+  @Path("applications")
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.TEXT_PLAIN)),
+    description =
+      "For MASTER: List all running application's ids of the cluster.\n" +
+        "For WORKER: List all running application's ids of the worker. It only 
return application ids running in that worker.")
+  @GET
+  def applications(): String = {
+    httpService.getApplicationList
+  }
+
+  @Path("listTopDiskUsedApps")
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.TEXT_PLAIN)),
+    description =
+      "For MASTER: List the top disk usage application ids. It will return the 
top disk usage application ids for the cluster.\n" +
+        "For WORKER: List the top disk usage application ids. It only return 
application ids running in that worker.")
+  @GET
+  def listTopDiskUsedApps(): String = {
+    httpService.listTopDiskUseApps
+  }
+}
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiRequestContext.scala
 
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiRequestContext.scala
new file mode 100644
index 000000000..2c293bf29
--- /dev/null
+++ 
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiRequestContext.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http.api
+
+import javax.servlet.ServletContext
+import javax.servlet.http.HttpServletRequest
+import javax.ws.rs.core.Context
+
+import org.eclipse.jetty.server.handler.ContextHandler
+
+import org.apache.celeborn.server.common.HttpService
+
+private[celeborn] trait ApiRequestContext {
+  @Context
+  protected var servletContext: ServletContext = _
+
+  @Context
+  protected var httpRequest: HttpServletRequest = _
+
+  final protected def httpService: HttpService = 
HttpServiceContext.get(servletContext)
+
+  protected def normalizeParam(param: String): String = 
Option(param).map(_.trim).getOrElse("")
+}
+
+private[celeborn] object HttpServiceContext {
+  private val attribute = getClass.getCanonicalName
+
+  def set(contextHandler: ContextHandler, rs: HttpService): Unit = {
+    contextHandler.setAttribute(attribute, rs)
+  }
+
+  def get(context: ServletContext): HttpService = {
+    context.getAttribute(attribute).asInstanceOf[HttpService]
+  }
+}
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServerInitializer.scala
 
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiRootResource.scala
similarity index 53%
copy from 
service/src/main/scala/org/apache/celeborn/server/common/http/HttpServerInitializer.scala
copy to 
service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiRootResource.scala
index 2619606b5..6d69c1b58 100644
--- 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServerInitializer.scala
+++ 
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiRootResource.scala
@@ -15,19 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.celeborn.server.common.http
+package org.apache.celeborn.server.common.http.api
 
-import io.netty.channel.{ChannelInitializer, SimpleChannelInboundHandler}
-import io.netty.channel.socket.SocketChannel
-import io.netty.handler.codec.http.{HttpObjectAggregator, HttpServerCodec}
+import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
+import org.glassfish.jersey.server.ResourceConfig
+import org.glassfish.jersey.servlet.ServletContainer
 
-class HttpServerInitializer(
-    handlers: SimpleChannelInboundHandler[_]) extends 
ChannelInitializer[SocketChannel] {
+import org.apache.celeborn.server.common.HttpService
 
-  override def initChannel(channel: SocketChannel): Unit = {
-    val pipeline = channel.pipeline()
-    pipeline.addLast(new HttpServerCodec())
-      .addLast("httpAggregator", new HttpObjectAggregator(512 * 1024))
-      .addLast(handlers)
+private[celeborn] object ApiRootResource {
+  def getServletHandler(rs: HttpService): ServletContextHandler = {
+    val openapiConf: ResourceConfig = new OpenAPIConfig
+    val holder = new ServletHolder(new ServletContainer(openapiConf))
+    val handler = new ServletContextHandler(ServletContextHandler.NO_SESSIONS)
+    handler.setContextPath("/")
+    HttpServiceContext.set(handler, rs)
+    handler.addServlet(holder, "/*")
+    handler
   }
 }
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/CelebornOpenApiResource.scala
 
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/CelebornOpenApiResource.scala
new file mode 100644
index 000000000..f0e9cc40a
--- /dev/null
+++ 
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/CelebornOpenApiResource.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http.api
+
+import javax.servlet.ServletConfig
+import javax.ws.rs.{GET, Path, PathParam, Produces}
+import javax.ws.rs.core.{Application, Context, HttpHeaders, MediaType, 
Response, UriInfo}
+
+import scala.collection.JavaConverters._
+
+import com.fasterxml.jackson.core.util.DefaultPrettyPrinter
+import io.swagger.v3.jaxrs2.integration.JaxrsOpenApiContextBuilder
+import io.swagger.v3.jaxrs2.integration.resources.BaseOpenApiResource
+import io.swagger.v3.oas.annotations.Operation
+import io.swagger.v3.oas.integration.api.OpenApiContext
+import io.swagger.v3.oas.models.OpenAPI
+import io.swagger.v3.oas.models.info.{Info, License}
+import io.swagger.v3.oas.models.servers.Server
+import org.apache.commons.lang3.StringUtils
+
+@Path("/openapi.{type:json|yaml}")
+class CelebornOpenApiResource extends BaseOpenApiResource with 
ApiRequestContext {
+  @Context
+  protected var config: ServletConfig = _
+
+  @Context
+  protected var app: Application = _
+
+  @GET
+  @Produces(Array(MediaType.APPLICATION_JSON, "application/yaml"))
+  @Operation(hidden = true)
+  def getOpenApi(
+      @Context headers: HttpHeaders,
+      @Context uriInfo: UriInfo,
+      @PathParam("type") tpe: String): Response = {
+
+    val ctxId = getContextId(config)
+    val ctx: OpenApiContext = new CelebornJaxrsOpenApiContextBuilder()
+      .servletConfig(config)
+      .application(app)
+      .resourcePackages(OpenAPIConfig.packages.toSet.asJava)
+      .configLocation(configLocation)
+      .openApiConfiguration(openApiConfiguration)
+      .ctxId(ctxId)
+      .buildContext(true)
+
+    val openApi = setCelebornOpenAPIDefinition(ctx.read())
+
+    if (StringUtils.isNotBlank(tpe) && tpe.trim().equalsIgnoreCase("yaml")) {
+      Response.status(Response.Status.OK)
+        .entity(
+          ctx.getOutputYamlMapper()
+            .writer(new DefaultPrettyPrinter())
+            .writeValueAsString(openApi))
+        .`type`("application/yaml")
+        .build()
+    } else {
+      Response.status(Response.Status.OK)
+        .entity(
+          ctx.getOutputJsonMapper
+            .writer(new DefaultPrettyPrinter())
+            .writeValueAsString(openApi))
+        .`type`(MediaType.APPLICATION_JSON_TYPE)
+        .build()
+    }
+  }
+
+  private def setCelebornOpenAPIDefinition(openApi: OpenAPI): OpenAPI = {
+    // TODO: to improve when https is enabled.
+    val apiUrl = s"http://${httpService.connectionUrl}/";
+    openApi.info(
+      new Info().title(
+        s"Apache Celeborn (Incubating) REST API Documentation")
+        .description(s"Role: ${httpService.serviceName}")
+        .license(
+          new License().name("Apache License 2.0")
+            .url("https://www.apache.org/licenses/LICENSE-2.0.txt";)))
+      .servers(List(new Server().url(apiUrl)).asJava)
+  }
+}
+
+class CelebornJaxrsOpenApiContextBuilder
+  extends JaxrsOpenApiContextBuilder[CelebornJaxrsOpenApiContextBuilder]
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServerInitializer.scala
 
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/CelebornScalaObjectMapper.scala
similarity index 57%
copy from 
service/src/main/scala/org/apache/celeborn/server/common/http/HttpServerInitializer.scala
copy to 
service/src/main/scala/org/apache/celeborn/server/common/http/api/CelebornScalaObjectMapper.scala
index 2619606b5..37375e2c1 100644
--- 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServerInitializer.scala
+++ 
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/CelebornScalaObjectMapper.scala
@@ -15,19 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.celeborn.server.common.http
+package org.apache.celeborn.server.common.http.api
 
-import io.netty.channel.{ChannelInitializer, SimpleChannelInboundHandler}
-import io.netty.channel.socket.SocketChannel
-import io.netty.handler.codec.http.{HttpObjectAggregator, HttpServerCodec}
+import javax.ws.rs.ext.ContextResolver
 
-class HttpServerInitializer(
-    handlers: SimpleChannelInboundHandler[_]) extends 
ChannelInitializer[SocketChannel] {
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
 
-  override def initChannel(channel: SocketChannel): Unit = {
-    val pipeline = channel.pipeline()
-    pipeline.addLast(new HttpServerCodec())
-      .addLast("httpAggregator", new HttpObjectAggregator(512 * 1024))
-      .addLast(handlers)
-  }
+class CelebornScalaObjectMapper extends ContextResolver[ObjectMapper] {
+  private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
+
+  override def getContext(aClass: Class[_]): ObjectMapper = mapper
 }
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServerInitializer.scala
 
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/OpenAPIConfig.scala
similarity index 57%
rename from 
service/src/main/scala/org/apache/celeborn/server/common/http/HttpServerInitializer.scala
rename to 
service/src/main/scala/org/apache/celeborn/server/common/http/api/OpenAPIConfig.scala
index 2619606b5..07053de4c 100644
--- 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServerInitializer.scala
+++ 
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/OpenAPIConfig.scala
@@ -15,19 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.celeborn.server.common.http
+package org.apache.celeborn.server.common.http.api
 
-import io.netty.channel.{ChannelInitializer, SimpleChannelInboundHandler}
-import io.netty.channel.socket.SocketChannel
-import io.netty.handler.codec.http.{HttpObjectAggregator, HttpServerCodec}
+import org.glassfish.jersey.server.ResourceConfig
 
-class HttpServerInitializer(
-    handlers: SimpleChannelInboundHandler[_]) extends 
ChannelInitializer[SocketChannel] {
+class OpenAPIConfig extends ResourceConfig {
+  packages(OpenAPIConfig.packages: _*)
+  register(classOf[CelebornOpenApiResource])
+  register(classOf[CelebornScalaObjectMapper])
+}
 
-  override def initChannel(channel: SocketChannel): Unit = {
-    val pipeline = channel.pipeline()
-    pipeline.addLast(new HttpServerCodec())
-      .addLast("httpAggregator", new HttpObjectAggregator(512 * 1024))
-      .addLast(handlers)
-  }
+object OpenAPIConfig {
+  val packages = Seq(
+    "org.apache.celeborn.server.common.http.api",
+    "org.apache.celeborn.service.deploy.master.http.api",
+    "org.apache.celeborn.service.deploy.worker.http.api")
 }
diff --git a/service/src/test/resources/metrics-api.properties 
b/service/src/test/resources/metrics-api.properties
new file mode 100644
index 000000000..b1b2e6725
--- /dev/null
+++ b/service/src/test/resources/metrics-api.properties
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+*.sink.prometheusServlet.class=org.apache.celeborn.common.metrics.sink.PrometheusServlet
+*.sink.jsonServlet.class=org.apache.celeborn.common.metrics.sink.JsonServlet
diff --git 
a/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceSuite.scala
 
b/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceSuite.scala
new file mode 100644
index 000000000..632d06c79
--- /dev/null
+++ 
b/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceSuite.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http
+
+import javax.ws.rs.core.MediaType
+
+import org.apache.celeborn.common.CelebornConf
+
+abstract class ApiBaseResourceSuite extends HttpTestHelper {
+  celebornConf.set(CelebornConf.METRICS_ENABLED.key, "true")
+    .set(
+      CelebornConf.METRICS_CONF.key,
+      
Thread.currentThread().getContextClassLoader.getResource("metrics-api.properties").getFile)
+
+  test("ping") {
+    val response = webTarget.path("ping").request(MediaType.TEXT_PLAIN).get()
+    assert(200 == response.getStatus)
+    assert(response.readEntity(classOf[String]) == "pong")
+  }
+
+  test("conf") {
+    val response = webTarget.path("conf").request(MediaType.TEXT_PLAIN).get()
+    assert(200 == response.getStatus)
+  }
+
+  test("listDynamicConfigs") {
+    val response = webTarget.path("listDynamicConfigs")
+      .queryParam("LEVEL", "TENANT")
+      .request(MediaType.TEXT_PLAIN)
+      .get()
+    assert(200 == response.getStatus)
+  }
+
+  test("workerInfo") {
+    val response = 
webTarget.path("workerInfo").request(MediaType.TEXT_PLAIN).get()
+    assert(200 == response.getStatus)
+  }
+
+  test("threadDump") {
+    val response = 
webTarget.path("threadDump").request(MediaType.TEXT_PLAIN).get()
+    assert(200 == response.getStatus)
+  }
+
+  test("shuffle") {
+    val response = 
webTarget.path("shuffle").request(MediaType.TEXT_PLAIN).get()
+    assert(200 == response.getStatus)
+  }
+
+  test("applications") {
+    val response = 
webTarget.path("applications").request(MediaType.TEXT_PLAIN).get()
+    assert(200 == response.getStatus)
+  }
+
+  test("listTopDiskUsedApps") {
+    val response = 
webTarget.path("listTopDiskUsedApps").request(MediaType.TEXT_PLAIN).get()
+    assert(200 == response.getStatus)
+  }
+
+  test("openapi.json") {
+    val response = 
webTarget.path("openapi.json").request(MediaType.APPLICATION_JSON).get()
+    assert(200 == response.getStatus)
+    assert(response.readEntity(classOf[String]).contains("/conf"))
+  }
+
+  test("swagger") {
+    Seq("swagger", "docs", "help").foreach { path =>
+      val response = webTarget.path(path).request(MediaType.TEXT_HTML).get()
+      assert(200 == response.getStatus)
+      assert(response.readEntity(classOf[String]).contains("swagger-ui"))
+    }
+  }
+
+  test("metrics/prometheus") {
+    val response = 
webTarget.path("metrics/prometheus").request(MediaType.APPLICATION_JSON).get()
+    assert(200 == response.getStatus)
+    
assert(response.readEntity(classOf[String]).contains("metrics_jvm_memory_heap_max_Value"))
+  }
+
+  test("metrics/json") {
+    val response = 
webTarget.path("metrics/json").request(MediaType.APPLICATION_JSON).get()
+    assert(200 == response.getStatus)
+    assert(response.readEntity(classOf[String]).contains("\"name\" : 
\"jvm.memory.heap.max\""))
+  }
+}
diff --git 
a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpTestHelper.scala
 
b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpTestHelper.scala
new file mode 100644
index 000000000..8674fc39c
--- /dev/null
+++ 
b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpTestHelper.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http
+
+import java.net.URI
+import javax.ws.rs.client.WebTarget
+import javax.ws.rs.core.{Application, UriBuilder}
+
+import org.glassfish.jersey.client.ClientConfig
+import org.glassfish.jersey.media.multipart.MultiPartFeature
+import org.glassfish.jersey.server.ResourceConfig
+import org.glassfish.jersey.test.JerseyTest
+import org.glassfish.jersey.test.jetty.JettyTestContainerFactory
+import org.glassfish.jersey.test.spi.TestContainerFactory
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+import org.scalatest.funsuite.AnyFunSuite
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.server.common.HttpService
+import org.apache.celeborn.server.common.http.HttpTestHelper.RestApiBaseSuite
+import org.apache.celeborn.server.common.http.api.CelebornScalaObjectMapper
+
+object HttpTestHelper {
+  class RestApiBaseSuite extends JerseyTest {
+
+    override def configure: Application = new ResourceConfig(getClass)
+      .register(classOf[MultiPartFeature])
+
+    override def configureClient(config: ClientConfig): Unit = {
+      config.register(classOf[CelebornScalaObjectMapper])
+        .register(classOf[MultiPartFeature])
+    }
+
+    override def getTestContainerFactory: TestContainerFactory = new 
JettyTestContainerFactory
+  }
+}
+
+trait HttpTestHelper extends AnyFunSuite
+  with BeforeAndAfterAll
+  with BeforeAndAfterEach
+  with Logging {
+
+  protected val celebornConf = new CelebornConf()
+  protected def httpService: HttpService
+
+  protected val restApiBaseSuite: JerseyTest = new RestApiBaseSuite
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    restApiBaseSuite.setUp()
+  }
+
+  override def afterAll(): Unit = {
+    restApiBaseSuite.tearDown()
+    super.afterAll()
+  }
+
+  protected lazy val baseUri: URI =
+    UriBuilder.fromUri(s"http://${httpService.connectionUrl}/";).build()
+
+  protected lazy val webTarget: WebTarget = 
restApiBaseSuite.client.target(baseUri)
+}
diff --git 
a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
 
b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
deleted file mode 100644
index cbb5d18ef..000000000
--- 
a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.celeborn.server.common.http
-
-import org.scalatest.funsuite.AnyFunSuite
-
-import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.server.common.Service
-
-class HttpUtilsSuite extends AnyFunSuite with Logging {
-
-  def checkParseUri(
-      uri: String,
-      expectPath: String,
-      expectParameters: Map[String, String]): Unit = {
-    val (path, parameters) = HttpUtils.parseUri(uri)
-    assert(path == expectPath)
-    assert(parameters == expectParameters)
-  }
-
-  test("CELEBORN-448: Support exclude worker manually") {
-    checkParseUri("/exclude", "/exclude", Map.empty)
-    checkParseUri(
-      "/exclude?add=localhost:1001:1002:1003:1004",
-      "/exclude",
-      Map("ADD" -> "localhost:1001:1002:1003:1004"))
-    checkParseUri(
-      "/exclude?remove=localhost:1001:1002:1003:1004",
-      "/exclude",
-      Map("REMOVE" -> "localhost:1001:1002:1003:1004"))
-    checkParseUri(
-      
"/exclude?add=localhost:1001:1002:1003:1004&remove=localhost:2001:2002:2003:2004",
-      "/exclude",
-      Map("ADD" -> "localhost:1001:1002:1003:1004", "REMOVE" -> 
"localhost:2001:2002:2003:2004"))
-  }
-
-  test("CELEBORN-847: Support parse HTTP Restful API parameters") {
-    checkParseUri("/exit", "/exit", Map.empty)
-    checkParseUri("/exit?type=decommission", "/exit", Map("TYPE" -> 
"decommission"))
-    checkParseUri(
-      "/exit?type=decommission&foo=A",
-      "/exit",
-      Map("TYPE" -> "decommission", "FOO" -> "A"))
-  }
-
-  test("CELEBORN-829: Improve response message of invalid HTTP request") {
-    assert(HttpUtils.help(Service.MASTER) ==
-      s"""Available API providers include:
-         |/applications        List all running application's ids of the 
cluster.
-         |/conf                List the conf setting of the master.
-         |/exclude             Excluded workers of the master add or remove 
the worker manually given worker id. The parameter add or remove specifies the 
excluded workers to add or remove, which value is separated by commas.
-         |/excludedWorkers     List all excluded workers of the master.
-         |/help                List the available API providers of the master.
-         |/hostnames           List all running application's 
LifecycleManager's hostnames of the cluster.
-         |/listDynamicConfigs  List the dynamic configs of the master. The 
parameter level specifies the config level of dynamic configs. The parameter 
tenant specifies the tenant id of TENANT or TENANT_USER level. The parameter 
name specifies the user name of TENANT_USER level. Meanwhile, either none or 
all of the parameter tenant and name are specified for TENANT_USER level.
-         |/listTopDiskUsedApps List the top disk usage application ids. It 
will return the top disk usage application ids for the cluster.
-         |/lostWorkers         List all lost workers of the master.
-         |/masterGroupInfo     List master group information of the service. 
It will list all master's LEADER, FOLLOWER information.
-         |/sendWorkerEvent     For Master(Leader) can send worker event to 
manager workers. Legal types are 'None', 'Immediately', 'Decommission', 
'DecommissionThenIdle', 'Graceful', 'Recommission'
-         |/shuffles            List all running shuffle keys of the service. 
It will return all running shuffle's key of the cluster.
-         |/shutdownWorkers     List all shutdown workers of the master.
-         |/threadDump          List the current thread dump of the master.
-         |/workerEventInfo     List all worker event infos of the master.
-         |/workerInfo          List worker information of the service. It will 
list all registered workers 's information.
-         |""".stripMargin)
-    assert(HttpUtils.help(Service.WORKER) ==
-      s"""Available API providers include:
-         |/applications              List all running application's ids of the 
worker. It only return application ids running in that worker.
-         |/conf                      List the conf setting of the worker.
-         |/exit                      Trigger this worker to exit. Legal types 
are 'DECOMMISSION', 'GRACEFUL' and 'IMMEDIATELY'.
-         |/help                      List the available API providers of the 
worker.
-         |/isRegistered              Show if the worker is registered to the 
master success.
-         |/isShutdown                Show if the worker is during the process 
of shutdown.
-         |/listDynamicConfigs        List the dynamic configs of the worker. 
The parameter level specifies the config level of dynamic configs. The 
parameter tenant specifies the tenant id of TENANT or TENANT_USER level. The 
parameter name specifies the user name of TENANT_USER level. Meanwhile, either 
none or all of the parameter tenant and name are specified for TENANT_USER 
level.
-         |/listPartitionLocationInfo List all the living PartitionLocation 
information in that worker.
-         |/listTopDiskUsedApps       List the top disk usage application ids. 
It only return application ids running in that worker.
-         |/shuffles                  List all the running shuffle keys of the 
worker. It only return keys of shuffles running in that worker.
-         |/threadDump                List the current thread dump of the 
worker.
-         |/unavailablePeers          List the unavailable peers of the worker, 
this always means the worker connect to the peer failed.
-         |/workerInfo                List the worker information of the worker.
-         |""".stripMargin)
-  }
-
-  test("CELEBORN-1245: Support Master manage workers") {
-    checkParseUri(
-      
"/sendWorkerEvent?type=decommission&workers=localhost:1001:1002:1003:1004",
-      "/sendWorkerEvent",
-      Map("TYPE" -> "decommission", "WORKERS" -> 
"localhost:1001:1002:1003:1004"))
-  }
-
-  test("CELEBORN-1056: Introduce Rest API of listing dynamic configuration") {
-    checkParseUri("/listDynamicConfigs", "/listDynamicConfigs", Map.empty)
-    checkParseUri(
-      "/listDynamicConfigs?level=system",
-      "/listDynamicConfigs",
-      Map("LEVEL" -> "system"))
-    checkParseUri(
-      "/listDynamicConfigs?level=tenant&tenant=tenantId1",
-      "/listDynamicConfigs",
-      Map("LEVEL" -> "tenant", "TENANT" -> "tenantId1"))
-    checkParseUri(
-      "/listDynamicConfigs?level=tenant_user&tenant=tenantId1&name=user1",
-      "/listDynamicConfigs",
-      Map("LEVEL" -> "tenant_user", "TENANT" -> "tenantId1", "NAME" -> 
"user1"))
-  }
-}
diff --git a/tests/spark-it/pom.xml b/tests/spark-it/pom.xml
index 75e50221b..842d7a8ed 100644
--- a/tests/spark-it/pom.xml
+++ b/tests/spark-it/pom.xml
@@ -75,6 +75,14 @@
           <groupId>log4j</groupId>
           <artifactId>log4j</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>org.glassfish.jersey.inject</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.glassfish.jersey.core</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
diff --git a/worker/pom.xml b/worker/pom.xml
index c4a7c913d..3821fe9da 100644
--- a/worker/pom.xml
+++ b/worker/pom.xml
@@ -98,6 +98,12 @@
       <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      <artifactId>celeborn-service_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+    </dependency>
     <dependency>
       <groupId>org.apache.celeborn</groupId>
       <artifactId>celeborn-master_${scala.binary.version}</artifactId>
@@ -109,6 +115,16 @@
       <artifactId>mockito-scala-scalatest_${scala.binary.version}</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.glassfish.jersey.test-framework</groupId>
+      <artifactId>jersey-test-framework-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.glassfish.jersey.test-framework.providers</groupId>
+      <artifactId>jersey-test-framework-provider-jetty</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResource.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResource.scala
new file mode 100644
index 000000000..8573b7152
--- /dev/null
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResource.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.http.api
+
+import javax.ws.rs.{GET, Path, POST, QueryParam}
+import javax.ws.rs.core.MediaType
+
+import io.swagger.v3.oas.annotations.media.Content
+import io.swagger.v3.oas.annotations.responses.ApiResponse
+
+import org.apache.celeborn.server.common.http.api.ApiRequestContext
+
+@Path("/")
+class ApiWorkerResource extends ApiRequestContext {
+  @Path("/listPartitionLocationInfo")
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.TEXT_PLAIN)),
+    description = "List all the living PartitionLocation information in that 
worker.")
+  @GET
+  def listPartitionLocationInfo: String = httpService.listPartitionLocationInfo
+
+  @Path("/unavailablePeers")
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.TEXT_PLAIN)),
+    description =
+      "List the unavailable peers of the worker, this always means the worker 
connect to the peer failed.")
+  @GET
+  def unavailablePeers: String = httpService.getUnavailablePeers
+
+  @Path("/isShutdown")
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.TEXT_PLAIN)),
+    description = "Show if the worker is during the process of shutdown.")
+  @GET
+  def isShutdown: String = httpService.isShutdown
+
+  @Path("/isRegistered")
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.TEXT_PLAIN)),
+    description = "Show if the worker is registered to the master success.")
+  @GET
+  def isRegistered: String = httpService.isRegistered
+
+  @Path("/exit")
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.TEXT_PLAIN)),
+    description =
+      "Trigger this worker to exit. Legal types are 'DECOMMISSION', 'GRACEFUL' 
and 'IMMEDIATELY'.")
+  @POST
+  def exit(@QueryParam("TYPE") exitType: String): String = {
+    httpService.exit(normalizeParam(exitType))
+  }
+}
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceSuite.scala
new file mode 100644
index 000000000..cc09764d0
--- /dev/null
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceSuite.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage
+
+import javax.ws.rs.core.MediaType
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
+import org.apache.celeborn.server.common.HttpService
+import org.apache.celeborn.server.common.http.ApiBaseResourceSuite
+import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments}
+
+class ApiWorkerResourceSuite extends ApiBaseResourceSuite {
+  private var worker: Worker = _
+  override protected def httpService: HttpService = worker
+
+  override def beforeAll(): Unit = {
+    val workerArgs = new WorkerArguments(Array(), celebornConf)
+    worker = new Worker(celebornConf, workerArgs)
+    worker.metricsSystem.start()
+    worker.startHttpServer()
+    super.beforeAll()
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    worker.metricsSystem.stop()
+    worker.rpcEnv.shutdown()
+    worker.stop(CelebornExitKind.EXIT_IMMEDIATELY)
+  }
+
+  test("listPartitionLocationInfo") {
+    val response = 
webTarget.path("listPartitionLocationInfo").request(MediaType.TEXT_PLAIN).get()
+    assert(200 == response.getStatus)
+  }
+
+  test("unavailablePeers") {
+    val response = 
webTarget.path("unavailablePeers").request(MediaType.TEXT_PLAIN).get()
+    assert(200 == response.getStatus)
+  }
+
+  test("isShutdown") {
+    val response = 
webTarget.path("isShutdown").request(MediaType.TEXT_PLAIN).get()
+    assert(200 == response.getStatus)
+  }
+
+  test("isRegistered") {
+    val response = 
webTarget.path("isRegistered").request(MediaType.TEXT_PLAIN).get()
+    assert(200 == response.getStatus)
+  }
+}

Reply via email to