This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new adbc77cd4 [CELEBORN-1317] Refine celeborn http server and support
swagger ui
adbc77cd4 is described below
commit adbc77cd4ff5a648d25fe01c3dabc87ee5a4869f
Author: Fei Wang <[email protected]>
AuthorDate: Wed Mar 27 23:18:18 2024 +0800
[CELEBORN-1317] Refine celeborn http server and support swagger ui
### What changes were proposed in this pull request?
Before, there is no http request spec likes query param, http method and
response mediaType.
And for each api, a HttpEndpoint class is needed.
In this PR, we refine the code for http service and provide swagger ui.
Note that: This pr does not change the orignal api request and response
behavior, including metrics APIs.
TODO:
1. define DTO
2. http request authentication
<img width="1900" alt="image"
src="https://github.com/apache/incubator-celeborn/assets/6757692/7f8c2363-170d-4bdf-b2c9-74260e31d3e5">
<img width="1138" alt="image"
src="https://github.com/apache/incubator-celeborn/assets/6757692/3ae6ec8e-00a8-475b-bb37-0329536185f6">
### Why are the changes needed?
To close CELEBORN-1317
### Does this PR introduce _any_ user-facing change?
The api is align with before.
### How was this patch tested?
UT.
Closes #2371 from turboFei/jetty.
Authored-by: Fei Wang <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
LICENSE-binary | 25 ++
.../org/apache/celeborn/common/CelebornConf.scala | 40 +++
.../celeborn/common/metrics/MetricsUtils.scala | 22 +-
.../celeborn/common/metrics/sink/CsvSink.scala | 4 +-
.../common/metrics/sink/GraphiteSink.scala | 4 +-
dev/deps/dependencies-server | 44 +++-
docs/configuration/master.md | 2 +
docs/configuration/worker.md | 2 +
master/pom.xml | 16 ++
.../deploy/master/http/api/ApiMasterResource.scala | 112 +++++++++
.../deploy/master/ApiMasterResourceSuite.scala | 108 +++++++++
pom.xml | 112 ++++++++-
project/CelebornBuild.scala | 56 ++++-
service/pom.xml | 100 +++++++-
.../org/apache/celeborn/swagger/index.html | 73 ++++++
.../celeborn/common/metrics/MetricsSystem.scala | 9 +-
.../common/metrics/sink/AbstractServlet.scala | 18 +-
.../celeborn/common/metrics/sink/JsonServlet.scala | 21 +-
.../common/metrics/sink/PrometheusServlet.scala | 20 +-
.../celeborn/server/common/HttpService.scala | 56 ++++-
.../celeborn/server/common/http/HttpEndpoint.scala | 269 ---------------------
.../server/common/http/HttpRequestHandler.scala | 77 ------
.../celeborn/server/common/http/HttpServer.scala | 137 +++++++----
.../celeborn/server/common/http/HttpUtils.scala | 165 ++++++++-----
.../server/common/http/api/ApiBaseResource.scala | 127 ++++++++++
.../server/common/http/api/ApiRequestContext.scala | 50 ++++
.../ApiRootResource.scala} | 25 +-
.../common/http/api/CelebornOpenApiResource.scala | 98 ++++++++
.../CelebornScalaObjectMapper.scala} | 20 +-
.../OpenAPIConfig.scala} | 24 +-
service/src/test/resources/metrics-api.properties | 18 ++
.../server/common/http/ApiBaseResourceSuite.scala | 99 ++++++++
.../server/common/http/HttpTestHelper.scala | 78 ++++++
.../server/common/http/HttpUtilsSuite.scala | 121 ---------
tests/spark-it/pom.xml | 8 +
worker/pom.xml | 16 ++
.../deploy/worker/http/api/ApiWorkerResource.scala | 78 ++++++
.../worker/storage/ApiWorkerResourceSuite.scala | 66 +++++
38 files changed, 1615 insertions(+), 705 deletions(-)
diff --git a/LICENSE-binary b/LICENSE-binary
index a33986495..034141317 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -248,6 +248,11 @@ io.netty:netty-transport-native-unix-common
io.netty:netty-transport-rxtx
io.netty:netty-transport-sctp
io.netty:netty-transport-udt
+io.swagger.core.v3:swagger-annotations
+io.swagger.core.v3:swagger-core
+io.swagger.core.v3:swagger-integration
+io.swagger.core.v3:swagger-jaxrs2
+io.swagger.core.v3:swagger-models
org.apache.commons:commons-crypto
org.apache.commons:commons-lang3
org.apache.hadoop:hadoop-client-api
@@ -267,6 +272,13 @@ org.apache.ratis:ratis-server
org.apache.ratis:ratis-server-api
org.apache.ratis:ratis-shell
org.apache.ratis:ratis-thirdparty-misc
+org.eclipse.jetty:jetty-http
+org.eclipse.jetty:jetty-io
+org.eclipse.jetty:jetty-security
+org.eclipse.jetty:jetty-server
+org.eclipse.jetty:jetty-servlet
+org.eclipse.jetty:jetty-util-ajax
+org.eclipse.jetty:jetty-util
org.javassist:javassist
org.reflections:reflections
org.roaringbitmap:RoaringBitmap
@@ -275,6 +287,7 @@ org.rocksdb:rocksdbjni
org.scala-lang:scala-library
org.scala-lang:scala-reflect
org.slf4j:jcl-over-slf4j
+org.webjars:swagger-ui
org.xerial.snappy:snappy-java
org.yaml:snakeyaml
@@ -308,3 +321,15 @@ org.slf4j:slf4j-api
------------
See licenses/LICENSE-javassist.txt for detail.
org.javassist:javassist
+
+Eclipse Public License (EPL) 2.0
+--------------------------------
+jakarta.annotation:jakarta.annotation-api
+jakarta.servlet:jakarta.servlet-api
+jakarta.ws.rs:jakarta.ws.rs-api
+org.glassfish.hk2:hk2-api
+org.glassfish.hk2:hk2-locator
+org.glassfish.hk2:hk2-utils
+org.glassfish.hk2.external:aopalliance-repackaged
+org.glassfish.hk2.external:jakarta.inject
+org.glassfish.hk2:osgi-resource-locator
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 905cfad57..2e4bd2d5a 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -586,6 +586,10 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
def masterHttpPort: Int = get(MASTER_HTTP_PORT)
+ def masterHttpMaxWorkerThreads: Int = get(MASTER_HTTP_MAX_WORKER_THREADS)
+
+ def masterHttpStopTimeout: Long = get(MASTER_HTTP_STOP_TIMEOUT)
+
def haEnabled: Boolean = get(HA_ENABLED)
def haMasterNodeId: Option[String] = get(HA_MASTER_NODE_ID)
@@ -676,6 +680,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def workerHttpHost: String =
get(WORKER_HTTP_HOST).replace("<localhost>", Utils.localHostName(this))
def workerHttpPort: Int = get(WORKER_HTTP_PORT)
+ def workerHttpMaxWorkerThreads: Int = get(WORKER_HTTP_MAX_WORKER_THREADS)
+ def workerHttpStopTimeout: Long = get(WORKER_HTTP_STOP_TIMEOUT)
def workerRpcPort: Int = get(WORKER_RPC_PORT)
def workerPushPort: Int = get(WORKER_PUSH_PORT)
def workerFetchPort: Int = get(WORKER_FETCH_PORT)
@@ -1976,6 +1982,23 @@ object CelebornConf extends Logging {
.checkValue(p => p >= 1024 && p < 65535, "Invalid port")
.createWithDefault(9098)
+ val MASTER_HTTP_MAX_WORKER_THREADS: ConfigEntry[Int] =
+ buildConf("celeborn.master.http.maxWorkerThreads")
+ .categories("master")
+ .version("0.5.0")
+ .doc("Maximum number of threads in the master http worker thread pool.")
+ .intConf
+ .checkValue(_ > 0, "Must be positive.")
+ .createWithDefault(999)
+
+ val MASTER_HTTP_STOP_TIMEOUT: ConfigEntry[Long] =
+ buildConf("celeborn.master.http.stopTimeout")
+ .categories("master")
+ .version("0.5.0")
+ .doc("Master http server stop timeout.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("5s")
+
val HA_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.master.ha.enabled")
.withAlternative("celeborn.ha.enabled")
@@ -2534,6 +2557,23 @@ object CelebornConf extends Logging {
.checkValue(p => p >= 1024 && p < 65535, "Invalid port")
.createWithDefault(9096)
+ val WORKER_HTTP_MAX_WORKER_THREADS: ConfigEntry[Int] =
+ buildConf("celeborn.worker.http.maxWorkerThreads")
+ .categories("worker")
+ .version("0.5.0")
+ .doc("Maximum number of threads in the worker http worker thread pool.")
+ .intConf
+ .checkValue(_ > 0, "Must be positive.")
+ .createWithDefault(999)
+
+ val WORKER_HTTP_STOP_TIMEOUT: ConfigEntry[Long] =
+ buildConf("celeborn.worker.http.stopTimeout")
+ .categories("worker")
+ .version("0.5.0")
+ .doc("Worker http server stop timeout.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("5s")
+
val WORKER_RPC_PORT: ConfigEntry[Int] =
buildConf("celeborn.worker.rpc.port")
.categories("worker")
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServerInitializer.scala
b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsUtils.scala
similarity index 57%
copy from
service/src/main/scala/org/apache/celeborn/server/common/http/HttpServerInitializer.scala
copy to
common/src/main/scala/org/apache/celeborn/common/metrics/MetricsUtils.scala
index 2619606b5..0d5086bbb 100644
---
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServerInitializer.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsUtils.scala
@@ -15,19 +15,19 @@
* limitations under the License.
*/
-package org.apache.celeborn.server.common.http
+package org.apache.celeborn.common.metrics
-import io.netty.channel.{ChannelInitializer, SimpleChannelInboundHandler}
-import io.netty.channel.socket.SocketChannel
-import io.netty.handler.codec.http.{HttpObjectAggregator, HttpServerCodec}
+import java.util.concurrent.TimeUnit
-class HttpServerInitializer(
- handlers: SimpleChannelInboundHandler[_]) extends
ChannelInitializer[SocketChannel] {
+object MetricsUtils {
+ private[this] val MINIMAL_POLL_UNIT = TimeUnit.SECONDS
+ private[this] val MINIMAL_POLL_PERIOD = 1
- override def initChannel(channel: SocketChannel): Unit = {
- val pipeline = channel.pipeline()
- pipeline.addLast(new HttpServerCodec())
- .addLast("httpAggregator", new HttpObjectAggregator(512 * 1024))
- .addLast(handlers)
+ def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int) {
+ val period = MINIMAL_POLL_UNIT.convert(pollPeriod, pollUnit)
+ if (period < MINIMAL_POLL_PERIOD) {
+ throw new IllegalArgumentException("Polling period " + pollPeriod + " "
+ pollUnit +
+ " below than minimal polling period ")
+ }
}
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/CsvSink.scala
b/common/src/main/scala/org/apache/celeborn/common/metrics/sink/CsvSink.scala
index 34097b34d..52ccf4638 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/CsvSink.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/metrics/sink/CsvSink.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit
import com.codahale.metrics.{CsvReporter, MetricRegistry}
-import org.apache.celeborn.common.metrics.MetricsSystem
+import org.apache.celeborn.common.metrics.MetricsUtils
class CsvSink(val property: Properties, val registry: MetricRegistry) extends
Sink {
val CSV_KEY_PERIOD = "period"
@@ -44,7 +44,7 @@ class CsvSink(val property: Properties, val registry:
MetricRegistry) extends Si
case None => TimeUnit.valueOf(CSV_DEFAULT_UNIT)
}
- MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
+ MetricsUtils.checkMinimalPollingPeriod(pollUnit, pollPeriod)
val pollDir = Option(property.getProperty(CSV_KEY_DIR)) match {
case Some(s) => s
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/GraphiteSink.scala
b/common/src/main/scala/org/apache/celeborn/common/metrics/sink/GraphiteSink.scala
index cc97b9bef..b2be72fff 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/GraphiteSink.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/metrics/sink/GraphiteSink.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit
import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.graphite.{Graphite, GraphiteReporter, GraphiteUDP}
-import org.apache.celeborn.common.metrics.MetricsSystem
+import org.apache.celeborn.common.metrics.MetricsUtils
private class GraphiteSink(val property: Properties, val registry:
MetricRegistry) extends Sink {
val GRAPHITE_DEFAULT_PERIOD = 10
@@ -62,7 +62,7 @@ private class GraphiteSink(val property: Properties, val
registry: MetricRegistr
val prefix =
propertyToOption(GRAPHITE_KEY_PREFIX).getOrElse(GRAPHITE_DEFAULT_PREFIX)
- MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
+ MetricsUtils.checkMinimalPollingPeriod(pollUnit, pollPeriod)
val graphite =
propertyToOption(GRAPHITE_KEY_PROTOCOL).map(_.toLowerCase(Locale.ROOT)) match {
case Some("udp") => new GraphiteUDP(host, port)
diff --git a/dev/deps/dependencies-server b/dev/deps/dependencies-server
index eadf37b17..ca5a32e7e 100644
--- a/dev/deps/dependencies-server
+++ b/dev/deps/dependencies-server
@@ -17,7 +17,9 @@
HikariCP/4.0.3//HikariCP-4.0.3.jar
RoaringBitmap/0.9.32//RoaringBitmap-0.9.32.jar
+aopalliance-repackaged/2.6.1//aopalliance-repackaged-2.6.1.jar
ap-loader-all/3.0-8//ap-loader-all-3.0-8.jar
+classgraph/4.8.138//classgraph-4.8.138.jar
commons-cli/1.5.0//commons-cli-1.5.0.jar
commons-crypto/1.0.0//commons-crypto-1.0.0.jar
commons-io/2.13.0//commons-io-2.13.0.jar
@@ -27,13 +29,43 @@ failureaccess/1.0.1//failureaccess-1.0.1.jar
guava/32.1.3-jre//guava-32.1.3-jre.jar
hadoop-client-api/3.3.6//hadoop-client-api-3.3.6.jar
hadoop-client-runtime/3.3.6//hadoop-client-runtime-3.3.6.jar
+hk2-api/2.6.1//hk2-api-2.6.1.jar
+hk2-locator/2.6.1//hk2-locator-2.6.1.jar
+hk2-utils/2.6.1//hk2-utils-2.6.1.jar
jackson-annotations/2.15.3//jackson-annotations-2.15.3.jar
jackson-core/2.15.3//jackson-core-2.15.3.jar
jackson-databind/2.15.3//jackson-databind-2.15.3.jar
+jackson-dataformat-yaml/2.13.2//jackson-dataformat-yaml-2.13.2.jar
+jackson-datatype-jsr310/2.13.2//jackson-datatype-jsr310-2.13.2.jar
+jackson-jaxrs-base/2.13.2//jackson-jaxrs-base-2.13.2.jar
+jackson-jaxrs-json-provider/2.13.2//jackson-jaxrs-json-provider-2.13.2.jar
+jackson-module-jaxb-annotations/2.14.1//jackson-module-jaxb-annotations-2.14.1.jar
jackson-module-scala_2.12/2.15.3//jackson-module-scala_2.12-2.15.3.jar
-javassist/3.28.0-GA//javassist-3.28.0-GA.jar
-javax.servlet-api/3.1.0//javax.servlet-api-3.1.0.jar
+jakarta.annotation-api/1.3.5//jakarta.annotation-api-1.3.5.jar
+jakarta.inject/2.6.1//jakarta.inject-2.6.1.jar
+jakarta.servlet-api/4.0.4//jakarta.servlet-api-4.0.4.jar
+jakarta.validation-api/2.0.2//jakarta.validation-api-2.0.2.jar
+jakarta.ws.rs-api/2.1.6//jakarta.ws.rs-api-2.1.6.jar
+jakarta.xml.bind-api/2.3.3//jakarta.xml.bind-api-2.3.3.jar
+javassist/3.29.0-GA//javassist-3.29.0-GA.jar
jcl-over-slf4j/1.7.36//jcl-over-slf4j-1.7.36.jar
+jersey-client/2.39.1//jersey-client-2.39.1.jar
+jersey-common/2.39.1//jersey-common-2.39.1.jar
+jersey-container-servlet-core/2.39.1//jersey-container-servlet-core-2.39.1.jar
+jersey-entity-filtering/2.39.1//jersey-entity-filtering-2.39.1.jar
+jersey-hk2/2.39.1//jersey-hk2-2.39.1.jar
+jersey-media-json-jackson/2.39.1//jersey-media-json-jackson-2.39.1.jar
+jersey-media-multipart/2.39.1//jersey-media-multipart-2.39.1.jar
+jersey-server/2.39.1//jersey-server-2.39.1.jar
+jetty-client/9.4.52.v20230823//jetty-client-9.4.52.v20230823.jar
+jetty-http/9.4.52.v20230823//jetty-http-9.4.52.v20230823.jar
+jetty-io/9.4.52.v20230823//jetty-io-9.4.52.v20230823.jar
+jetty-proxy/9.4.52.v20230823//jetty-proxy-9.4.52.v20230823.jar
+jetty-security/9.4.52.v20230823//jetty-security-9.4.52.v20230823.jar
+jetty-server/9.4.52.v20230823//jetty-server-9.4.52.v20230823.jar
+jetty-servlet/9.4.52.v20230823//jetty-servlet-9.4.52.v20230823.jar
+jetty-util-ajax/9.4.52.v20230823//jetty-util-ajax-9.4.52.v20230823.jar
+jetty-util/9.4.52.v20230823//jetty-util-9.4.52.v20230823.jar
jsr305/1.3.9//jsr305-1.3.9.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
@@ -46,6 +78,7 @@ maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
metrics-core/3.2.6//metrics-core-3.2.6.jar
metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar
+mimepull/1.9.15//mimepull-1.9.15.jar
mybatis/3.5.15//mybatis-3.5.15.jar
netty-all/4.1.107.Final//netty-all-4.1.107.Final.jar
netty-buffer/4.1.107.Final//netty-buffer-4.1.107.Final.jar
@@ -81,6 +114,7 @@
netty-transport-rxtx/4.1.107.Final//netty-transport-rxtx-4.1.107.Final.jar
netty-transport-sctp/4.1.107.Final//netty-transport-sctp-4.1.107.Final.jar
netty-transport-udt/4.1.107.Final//netty-transport-udt-4.1.107.Final.jar
netty-transport/4.1.107.Final//netty-transport-4.1.107.Final.jar
+osgi-resource-locator/1.0.3//osgi-resource-locator-1.0.3.jar
paranamer/2.8//paranamer-2.8.jar
protobuf-java/3.21.7//protobuf-java-3.21.7.jar
ratis-client/2.5.1//ratis-client-2.5.1.jar
@@ -101,4 +135,10 @@ shims/0.9.32//shims-0.9.32.jar
slf4j-api/1.7.36//slf4j-api-1.7.36.jar
snakeyaml/2.2//snakeyaml-2.2.jar
snappy-java/1.1.10.5//snappy-java-1.1.10.5.jar
+swagger-annotations/2.2.1//swagger-annotations-2.2.1.jar
+swagger-core/2.2.1//swagger-core-2.2.1.jar
+swagger-integration/2.2.1//swagger-integration-2.2.1.jar
+swagger-jaxrs2/2.2.1//swagger-jaxrs2-2.2.1.jar
+swagger-models/2.2.1//swagger-models-2.2.1.jar
+swagger-ui/4.9.1//swagger-ui-4.9.1.jar
zstd-jni/1.5.2-1//zstd-jni-1.5.2-1.jar
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index 7a504441d..13f1d5c40 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -43,7 +43,9 @@ license: |
| celeborn.master.heartbeat.worker.timeout | 120s | false | Worker heartbeat
timeout. | 0.3.0 | celeborn.worker.heartbeat.timeout |
| celeborn.master.host | <localhost> | false | Hostname for master to
bind. | 0.2.0 | |
| celeborn.master.http.host | <localhost> | false | Master's http host.
| 0.4.0 |
celeborn.metrics.master.prometheus.host,celeborn.master.metrics.prometheus.host
|
+| celeborn.master.http.maxWorkerThreads | 999 | false | Maximum number of
threads in the master http worker thread pool. | 0.5.0 | |
| celeborn.master.http.port | 9098 | false | Master's http port. | 0.4.0 |
celeborn.metrics.master.prometheus.port,celeborn.master.metrics.prometheus.port
|
+| celeborn.master.http.stopTimeout | 5s | false | Master http server stop
timeout. | 0.5.0 | |
| celeborn.master.internal.port | 8097 | false | Internal port on the master
where both workers and other master nodes connect. | 0.5.0 | |
| celeborn.master.port | 9097 | false | Port for master to bind. | 0.2.0 | |
| celeborn.master.rackResolver.refresh.interval | 30s | false | Interval for
refreshing the node rack information periodically. | 0.5.0 | |
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 68ad3f494..484cf1b6d 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -83,7 +83,9 @@ license: |
| celeborn.worker.graceful.shutdown.saveCommittedFileInfo.sync | false | false
| Whether to call sync method to save committed file infos into Level DB to
handle OS crash. | 0.3.1 | |
| celeborn.worker.graceful.shutdown.timeout | 600s | false | The worker's
graceful shutdown timeout time. | 0.2.0 | |
| celeborn.worker.http.host | <localhost> | false | Worker's http host.
| 0.4.0 |
celeborn.metrics.worker.prometheus.host,celeborn.worker.metrics.prometheus.host
|
+| celeborn.worker.http.maxWorkerThreads | 999 | false | Maximum number of
threads in the worker http worker thread pool. | 0.5.0 | |
| celeborn.worker.http.port | 9096 | false | Worker's http port. | 0.4.0 |
celeborn.metrics.worker.prometheus.port,celeborn.worker.metrics.prometheus.port
|
+| celeborn.worker.http.stopTimeout | 5s | false | Worker http server stop
timeout. | 0.5.0 | |
| celeborn.worker.internal.port | 0 | false | Internal server port on the
Worker where the master nodes connect. | 0.5.0 | |
| celeborn.worker.jvmProfiler.enabled | false | false | Turn on code profiling
via async_profiler in workers. | 0.5.0 | |
| celeborn.worker.jvmProfiler.localDir | . | false | Local file system path on
worker where profiler output is saved. Defaults to the working directory of the
worker process. | 0.5.0 | |
diff --git a/master/pom.xml b/master/pom.xml
index a5260a1ce..4dd3ed11c 100644
--- a/master/pom.xml
+++ b/master/pom.xml
@@ -83,11 +83,27 @@
<artifactId>log4j-1.2-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-service_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.test-framework</groupId>
+ <artifactId>jersey-test-framework-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.test-framework.providers</groupId>
+ <artifactId>jersey-test-framework-provider-jetty</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResource.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResource.scala
new file mode 100644
index 000000000..2093dee03
--- /dev/null
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResource.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.master.http.api
+
+import javax.ws.rs.{GET, Path, POST, QueryParam}
+import javax.ws.rs.core.MediaType
+
+import io.swagger.v3.oas.annotations.media.Content
+import io.swagger.v3.oas.annotations.responses.ApiResponse
+
+import org.apache.celeborn.server.common.http.api.ApiRequestContext
+
+@Path("/")
+class ApiMasterResource extends ApiRequestContext {
+
+ @Path("/masterGroupInfo")
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.TEXT_PLAIN)),
+ description =
+ "List master group information of the service. It will list all master's
LEADER, FOLLOWER information.")
+ @GET
+ def masterGroupInfo: String = httpService.getMasterGroupInfo
+
+ @Path("/lostWorkers")
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.TEXT_PLAIN)),
+ description = "List all lost workers of the master.")
+ @GET
+ def lostWorkers: String = httpService.getLostWorkers
+
+ @Path("/excludedWorkers")
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.TEXT_PLAIN)),
+ description = "List all excluded workers of the master.")
+ @GET
+ def excludedWorkers: String = httpService.getExcludedWorkers
+
+ @Path("/shutdownWorkers")
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.TEXT_PLAIN)),
+ description = "List all shutdown workers of the master.")
+ @GET
+ def shutdownWorkers: String = httpService.getShutdownWorkers
+
+ @Path("/hostnames")
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.TEXT_PLAIN)),
+ description = "List all running application's LifecycleManager's hostnames
of the cluster.")
+ @GET
+ def hostnames: String = httpService.getHostnameList
+
+ @Path("/sendWorkerEvent")
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.TEXT_PLAIN)),
+ description =
+ "For Master(Leader) can send worker event to manager workers. Legal
types are 'None', 'Immediately', 'Decommission', 'DecommissionThenIdle',
'Graceful', 'Recommission'")
+ @POST
+ def sendWorkerEvent(
+ @QueryParam("TYPE") eventType: String,
+ @QueryParam("WORKERS") workers: String): String = {
+ httpService.handleWorkerEvent(normalizeParam(eventType),
normalizeParam(workers))
+ }
+
+ @Path("/workerEventInfo")
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.TEXT_PLAIN)),
+ description = "List all worker event infos of the master.")
+ @GET
+ def workerEventInfo: String = httpService.getWorkerEventInfo()
+
+ @Path("/exclude")
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.TEXT_PLAIN)),
+ description = "List all worker event infos of the master.")
+ @POST
+ def excludeWorkers(
+ @QueryParam("ADD") addWorkers: String,
+ @QueryParam("REMOVE") removeWorkers: String): String = {
+ httpService.exclude(normalizeParam(addWorkers),
normalizeParam(removeWorkers))
+ }
+}
diff --git
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceSuite.scala
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceSuite.scala
new file mode 100644
index 000000000..f5e06e6a5
--- /dev/null
+++
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceSuite.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.master
+
+import javax.ws.rs.core.MediaType
+
+import com.google.common.io.Files
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
+import org.apache.celeborn.server.common.HttpService
+import org.apache.celeborn.server.common.http.ApiBaseResourceSuite
+
+class ApiMasterResourceSuite extends ApiBaseResourceSuite {
+ private var master: Master = _
+
+ override protected def httpService: HttpService = master
+
+ def getTmpDir(): String = {
+ val tmpDir = Files.createTempDir()
+ tmpDir.deleteOnExit()
+ tmpDir.getAbsolutePath
+ }
+
+ override def beforeAll(): Unit = {
+ val randomMasterPort = Utils.selectRandomPort(1024, 65535)
+ val randomHttpPort = randomMasterPort + 1
+ celebornConf.set(CelebornConf.HA_ENABLED.key, "false")
+ celebornConf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir())
+ celebornConf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir())
+ celebornConf.set(CelebornConf.MASTER_HTTP_HOST.key, "127.0.0.1")
+ celebornConf.set(CelebornConf.MASTER_HTTP_PORT.key,
randomHttpPort.toString)
+
+ val args = Array("-h", "localhost", "-p", randomMasterPort.toString)
+
+ val masterArgs = new MasterArguments(args, celebornConf)
+ master = new Master(celebornConf, masterArgs)
+ new Thread() {
+ override def run(): Unit = {
+ master.initialize()
+ }
+ }.start()
+ super.beforeAll()
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
+ master.rpcEnv.shutdown()
+ }
+
+ test("masterGroupInfo") {
+ val response =
webTarget.path("masterGroupInfo").request(MediaType.TEXT_PLAIN).get()
+ assert(200 == response.getStatus)
+ }
+
+ test("lostWorkers") {
+ val response =
webTarget.path("lostWorkers").request(MediaType.TEXT_PLAIN).get()
+ assert(200 == response.getStatus)
+ }
+
+ test("excludedWorkers") {
+ val response =
webTarget.path("excludedWorkers").request(MediaType.TEXT_PLAIN).get()
+ assert(200 == response.getStatus)
+ }
+
+ test("shutdownWorkers") {
+ val response =
webTarget.path("shutdownWorkers").request(MediaType.TEXT_PLAIN).get()
+ assert(200 == response.getStatus)
+ }
+
+ test("hostnames") {
+ val response =
webTarget.path("hostnames").request(MediaType.TEXT_PLAIN).get()
+ assert(200 == response.getStatus)
+ }
+
+ test("sendWorkerEvent") {
+ val response = webTarget.path("sendWorkerEvent")
+ .request(MediaType.TEXT_PLAIN)
+ .post(null)
+ assert(200 == response.getStatus)
+ }
+
+ test("workerEventInfo") {
+ val response =
webTarget.path("workerEventInfo").request(MediaType.TEXT_PLAIN).get()
+ assert(200 == response.getStatus)
+ }
+
+ test("exclude") {
+ val response =
webTarget.path("exclude").request(MediaType.TEXT_PLAIN).post(null)
+ assert(200 == response.getStatus)
+ }
+}
diff --git a/pom.xml b/pom.xml
index fdd60e69f..00a718a4c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -80,7 +80,6 @@
<google.jsr305.version>1.3.9</google.jsr305.version>
<grpc.version>1.44.0</grpc.version>
<guava.version>32.1.3-jre</guava.version>
- <javaxservlet.version>3.1.0</javaxservlet.version>
<junit.version>4.13.2</junit.version>
<leveldb.version>1.8</leveldb.version>
<log4j2.version>2.17.2</log4j2.version>
@@ -107,6 +106,13 @@
<hikaricp.version>4.0.3</hikaricp.version>
<h2.version>2.2.224</h2.version>
+ <!-- RESTful service dependencies -->
+ <swagger.version>2.2.1</swagger.version>
+ <swagger-ui.version>4.9.1</swagger-ui.version>
+ <jersey.version>2.39.1</jersey.version>
+ <jetty.version>9.4.52.v20230823</jetty.version>
+ <jakarta.servlet-api.version>4.0.4</jakarta.servlet-api.version>
+
<shading.prefix>org.apache.celeborn.shaded</shading.prefix>
<maven.plugin.antrun.version>3.0.0</maven.plugin.antrun.version>
@@ -355,11 +361,6 @@
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
- <dependency>
- <groupId>javax.servlet</groupId>
- <artifactId>javax.servlet-api</artifactId>
- <version>${javaxservlet.version}</version>
- </dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
@@ -457,6 +458,105 @@
<scope>test</scope>
</dependency>
+ <!-- RESTful service dependencies -->
+ <dependency>
+ <groupId>jakarta.servlet</groupId>
+ <artifactId>jakarta.servlet-api</artifactId>
+ <version>${jakarta.servlet-api.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.glassfish.jersey.core</groupId>
+ <artifactId>jersey-server</artifactId>
+ <version>${jersey.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>jakarta.xml.bind</groupId>
+ <artifactId>jakarta.xml.bind-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.glassfish.jersey.containers</groupId>
+ <artifactId>jersey-container-servlet-core</artifactId>
+ <version>${jersey.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.glassfish.jersey.inject</groupId>
+ <artifactId>jersey-hk2</artifactId>
+ <version>${jersey.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.glassfish.jersey.media</groupId>
+ <artifactId>jersey-media-json-jackson</artifactId>
+ <version>${jersey.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.glassfish.jersey.media</groupId>
+ <artifactId>jersey-media-multipart</artifactId>
+ <version>${jersey.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.glassfish.jersey.test-framework</groupId>
+ <artifactId>jersey-test-framework-core</artifactId>
+ <version>${jersey.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>jakarta.activation</groupId>
+ <artifactId>jakarta.activation-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.glassfish.jersey.test-framework.providers</groupId>
+ <artifactId>jersey-test-framework-provider-jetty</artifactId>
+ <version>${jersey.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-continuation</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>io.swagger.core.v3</groupId>
+ <artifactId>swagger-jaxrs2</artifactId>
+ <version>${swagger.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.activation</groupId>
+ <artifactId>jakarta.activation</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.javassist</groupId>
+ <artifactId>javassist</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!--
+ 1. This library only contains swagger-ui static resource
(.html/.css/.js/.png), for more detail, see
+ https://github.com/swagger-api/swagger-ui/blob/master/dist/
+ 2. Note that when trying to upgrade swagger-ui, we should also update
the version in the file(
+
service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala).
+ -->
+ <dependency>
+ <groupId>org.webjars</groupId>
+ <artifactId>swagger-ui</artifactId>
+ <version>${swagger-ui.version}</version>
+ </dependency>
+
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 0cd12ac3e..906ca112b 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -45,7 +45,6 @@ object Dependencies {
val findbugsVersion = "1.3.9"
val guavaVersion = "32.1.3-jre"
val hadoopVersion = "3.3.6"
- val javaxServletVersion = "3.1.0"
val junitInterfaceVersion = "0.13.3"
// don't forget update `junitInterfaceVersion` when we upgrade junit
val junitVersion = "4.13.2"
@@ -67,6 +66,11 @@ object Dependencies {
val mybatisVersion = "3.5.15"
val hikaricpVersion = "4.0.3"
val h2Version = "2.2.224"
+ val swaggerVersion = "2.2.1"
+ val swaggerUiVersion = "4.9.1"
+ val jerseyVersion = "2.39.1"
+ val jettyVersion = "9.4.52.v20230823"
+ val jakartaServeletApiVersion = "4.0.4"
// For SSL support
val bouncycastleVersion = "1.77"
@@ -105,7 +109,6 @@ object Dependencies {
val ioDropwizardMetricsJvm = "io.dropwizard.metrics" % "metrics-jvm" %
metricsVersion
val ioNetty = "io.netty" % "netty-all" % nettyVersion excludeAll(
ExclusionRule("io.netty", "netty-handler-ssl-ocsp"))
- val javaxServletApi = "javax.servlet" % "javax.servlet-api" %
javaxServletVersion
val leveldbJniAll = "org.fusesource.leveldbjni" % "leveldbjni-all" %
leveldbJniVersion
val log4j12Api = "org.apache.logging.log4j" % "log4j-1.2-api" % log4j2Version
val log4jSlf4jImpl = "org.apache.logging.log4j" % "log4j-slf4j-impl" %
log4j2Version
@@ -133,6 +136,23 @@ object Dependencies {
val zstdJni = "com.github.luben" % "zstd-jni" % zstdJniVersion
val mybatis = "org.mybatis" % "mybatis" % mybatisVersion
val hikaricp = "com.zaxxer" % "HikariCP" % hikaricpVersion
+ val jettyServer = "org.eclipse.jetty" % "jetty-server" % jettyVersion
excludeAll(
+ ExclusionRule("javax.servlet", "javax.servlet-api"))
+ val jettyServlet = "org.eclipse.jetty" % "jetty-servlet" % jettyVersion
excludeAll(
+ ExclusionRule("javax.servlet", "javax.servlet-api"))
+ val jettyProxy = "org.eclipse.jetty" % "jetty-proxy" % jettyVersion
+ val jakartaServletApi = "jakarta.servlet" % "jakarta.servlet-api" %
jakartaServeletApiVersion
+ val jerseyServer = "org.glassfish.jersey.core" % "jersey-server" %
jerseyVersion excludeAll(
+ ExclusionRule("jakarta.xml.bind", "jakarta.xml.bind-api"))
+ val jerseyContainerServletCore = "org.glassfish.jersey.containers" %
"jersey-container-servlet-core" % jerseyVersion
+ val jerseyHk2 = "org.glassfish.jersey.inject" % "jersey-hk2" % jerseyVersion
+ val jerseyMediaJsonJackson = "org.glassfish.jersey.media" %
"jersey-media-json-jackson" % jerseyVersion
+ val jerseyMediaMultipart = "org.glassfish.jersey.media" %
"jersey-media-multipart" % jerseyVersion
+ val swaggerJaxrs2 = "io.swagger.core.v3" % "swagger-jaxrs2" %swaggerVersion
excludeAll(
+ ExclusionRule("com.sun.activation", "jakarta.activation"),
+ ExclusionRule("org.javassist", "javassist"),
+ ExclusionRule("jakarta.activation", "jakarta.activation-api"))
+ val swaggerUi = "org.webjars" % "swagger-ui" % swaggerUiVersion
// Test dependencies
// https://www.scala-sbt.org/1.x/docs/Testing.html
@@ -143,6 +163,10 @@ object Dependencies {
val scalatestMockito = "org.mockito" %% "mockito-scala-scalatest" %
scalatestMockitoVersion
val scalatest = "org.scalatest" %% "scalatest" % scalatestVersion
val h2 = "com.h2database" % "h2" % h2Version
+ val jerseyTestFrameworkCore = "org.glassfish.jersey.test-framework" %
"jersey-test-framework-core" % jerseyVersion
+ val jerseyTestFrameworkProviderJetty =
"org.glassfish.jersey.test-framework.providers" %
"jersey-test-framework-provider-jetty" % jerseyVersion excludeAll(
+ ExclusionRule("org.eclipse.jetty", "jetty-util"),
+ ExclusionRule("org.eclipse.jetty", "jetty-continuation"))
// SSL support
val bouncycastleBcprovJdk18on = "org.bouncycastle" % "bcprov-jdk18on" %
bouncycastleVersion % "test"
@@ -458,21 +482,34 @@ object CelebornService {
Dependencies.findbugsJsr305,
Dependencies.commonsIo,
Dependencies.ioNetty,
- Dependencies.javaxServletApi,
Dependencies.commonsCrypto,
Dependencies.slf4jApi,
Dependencies.mybatis,
Dependencies.hikaricp,
+ Dependencies.swaggerJaxrs2,
+ Dependencies.swaggerUi,
+ Dependencies.jakartaServletApi,
+ Dependencies.jerseyServer,
+ Dependencies.jerseyContainerServletCore,
+ Dependencies.jerseyHk2,
+ Dependencies.jerseyMediaJsonJackson,
+ Dependencies.jerseyMediaMultipart,
+ Dependencies.jettyServer,
+ Dependencies.jettyServlet,
+ Dependencies.jettyProxy,
Dependencies.log4jSlf4jImpl % "test",
Dependencies.log4j12Api % "test",
- Dependencies.h2 % "test"
+ Dependencies.h2 % "test",
+ Dependencies.jerseyTestFrameworkCore % "test",
+ Dependencies.jerseyTestFrameworkProviderJetty % "test"
) ++ commonUnitTestDependencies
)
}
object CelebornMaster {
lazy val master = Project("celeborn-master", file("master"))
- .dependsOn(CelebornCommon.common, CelebornService.service)
+ .dependsOn(CelebornCommon.common)
+ .dependsOn(CelebornService.service % "test->test;compile->compile")
.settings (
commonSettings,
protoSettings,
@@ -497,6 +534,7 @@ object CelebornWorker {
lazy val worker = Project("celeborn-worker", file("worker"))
.dependsOn(CelebornService.service)
.dependsOn(CelebornCommon.common % "test->test;compile->compile")
+ .dependsOn(CelebornService.service % "test->test;compile->compile")
.dependsOn(CelebornClient.client % "test->compile")
.dependsOn(CelebornMaster.master % "test->compile")
.settings (
@@ -516,7 +554,9 @@ object CelebornWorker {
Dependencies.leveldbJniAll,
Dependencies.roaringBitmap,
Dependencies.rocksdbJni,
- Dependencies.scalatestMockito % "test"
+ Dependencies.scalatestMockito % "test",
+ Dependencies.jerseyTestFrameworkCore % "test",
+ Dependencies.jerseyTestFrameworkProviderJetty % "test"
) ++ commonUnitTestDependencies
)
}
@@ -724,7 +764,9 @@ trait SparkClientProjects {
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "test",
"org.apache.spark" %% "spark-sql" % sparkVersion % "test",
- "org.apache.spark" %% "spark-core" % sparkVersion % "test"
classifier "tests",
+ "org.apache.spark" %% "spark-core" % sparkVersion % "test"
classifier "tests" excludeAll(
+ ExclusionRule("org.glassfish.jersey.inject", "*"),
+ ExclusionRule("org.glassfish.jersey.core", "*")),
"org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier
"tests"
) ++ commonUnitTestDependencies
)
diff --git a/service/pom.xml b/service/pom.xml
index faf80a142..b6d0da739 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -29,6 +29,36 @@
<packaging>jar</packaging>
<name>Celeborn Service</name>
+ <dependencyManagement>
+ <dependencies>
+ <!-- RESTful service dependencies, place here to prevent impacting sbt
mr dependencies -->
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ <version>${jetty.version}</version>
+ <exclusions>
+ <!--
+ Use `jakarta.servlet-api` instead.
+ -->
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>javax.servlet-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ <version>${jetty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-proxy</artifactId>
+ <version>${jetty.version}</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
<dependencies>
<dependency>
<groupId>org.apache.celeborn</groupId>
@@ -43,10 +73,6 @@
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
- <dependency>
- <groupId>javax.servlet</groupId>
- <artifactId>javax.servlet-api</artifactId>
- </dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
@@ -75,6 +101,62 @@
<scope>test</scope>
</dependency>
+ <!-- RESTful dependencies -->
+ <dependency>
+ <groupId>io.swagger.core.v3</groupId>
+ <artifactId>swagger-jaxrs2</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.webjars</groupId>
+ <artifactId>swagger-ui</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>jakarta.servlet</groupId>
+ <artifactId>jakarta.servlet-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.glassfish.jersey.core</groupId>
+ <artifactId>jersey-server</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.glassfish.jersey.containers</groupId>
+ <artifactId>jersey-container-servlet-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.glassfish.jersey.inject</groupId>
+ <artifactId>jersey-hk2</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.glassfish.jersey.media</groupId>
+ <artifactId>jersey-media-json-jackson</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.glassfish.jersey.media</groupId>
+ <artifactId>jersey-media-multipart</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-proxy</artifactId>
+ </dependency>
+
<!-- Test dependencies -->
<dependency>
<groupId>org.mockito</groupId>
@@ -91,5 +173,15 @@
<artifactId>log4j-1.2-api</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.test-framework</groupId>
+ <artifactId>jersey-test-framework-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.test-framework.providers</groupId>
+ <artifactId>jersey-test-framework-provider-jetty</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/service/src/main/resources/org/apache/celeborn/swagger/index.html
b/service/src/main/resources/org/apache/celeborn/swagger/index.html
new file mode 100644
index 000000000..2e0c4c687
--- /dev/null
+++ b/service/src/main/resources/org/apache/celeborn/swagger/index.html
@@ -0,0 +1,73 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!DOCTYPE html>
+<html lang="en">
+ <head>
+ <meta charset="UTF-8">
+ <title>Apache Celeborn(Incubating) REST API Documentation</title>
+ <link rel="stylesheet" type="text/css"
href="../swagger-static/swagger-ui.css" />
+ <link rel="icon" type="image/png"
href="../swagger-static/favicon-32x32.png" sizes="32x32" />
+ <link rel="icon" type="image/png"
href="../swagger-static/favicon-16x16.png" sizes="16x16" />
+ <style>
+ html
+ {
+ box-sizing: border-box;
+ overflow: -moz-scrollbars-vertical;
+ overflow-y: scroll;
+ }
+
+ *,
+ *:before,
+ *:after
+ {
+ box-sizing: inherit;
+ }
+
+ body
+ {
+ margin:0;
+ background: #fafafa;
+ }
+ </style>
+ </head>
+
+ <div id="swagger-ui"></div>
+
+ <script src="../swagger-static/swagger-ui-bundle.js" charset="UTF-8">
</script>
+ <script src="../swagger-static/swagger-ui-standalone-preset.js"
charset="UTF-8"> </script>
+ <script>
+ window.onload = function() {
+ // Begin Swagger UI call region
+ window.ui = SwaggerUIBundle({
+ url: location.origin + "/openapi.json",
+ dom_id: '#swagger-ui',
+ deepLinking: true,
+ presets: [
+ SwaggerUIBundle.presets.apis,
+ SwaggerUIStandalonePreset
+ ],
+ plugins: [
+ SwaggerUIBundle.plugins.DownloadUrl
+ ],
+ layout: "StandaloneLayout"
+ });
+ // End Swagger UI call region
+ };
+ </script>
+ </body>
+</html>
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
b/service/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
similarity index 96%
rename from
common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
rename to
service/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
index 82311bea9..ab919bb14 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
+++
b/service/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
@@ -25,11 +25,12 @@ import scala.collection.mutable.ArrayBuffer
import scala.util.matching.Regex
import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry}
+import org.eclipse.jetty.servlet.ServletContextHandler
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.CelebornConf.{METRICS_JSON_PATH,
METRICS_PROMETHEUS_PATH}
import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.common.metrics.sink.{JsonServlet,
PrometheusServlet, ServletHttpRequestHandler, Sink}
+import org.apache.celeborn.common.metrics.sink.{JsonServlet,
PrometheusServlet, Sink}
import org.apache.celeborn.common.metrics.source.Source
import org.apache.celeborn.common.util.Utils
@@ -51,7 +52,7 @@ class MetricsSystem(
metricsConfig.initialize()
- def getServletHandlers: Array[ServletHttpRequestHandler] = {
+ def getServletContextHandlers: Array[ServletContextHandler] = {
require(running, "Can only call getServletHandlers on a running
MetricsSystem")
prometheusServlet.map(_.getHandlers(conf)).getOrElse(Array()) ++
jsonServlet.map(_.getHandlers(conf)).getOrElse(Array())
@@ -139,7 +140,7 @@ class MetricsSystem(
prometheusServlet = Some(servlet.newInstance(
kv._2,
registry,
- sources.asScala,
+ sources.asScala.toSeq,
prometheusServletPath).asInstanceOf[PrometheusServlet])
} else if (kv._1 == "jsonServlet") {
val servlet = Utils.classForName(classPath)
@@ -152,7 +153,7 @@ class MetricsSystem(
jsonServlet = Some(servlet.newInstance(
kv._2,
registry,
- sources.asScala,
+ sources.asScala.toSeq,
jsonServletPath,
conf.metricsJsonPrettyEnabled.asInstanceOf[Object]).asInstanceOf[JsonServlet])
} else {
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/AbstractServlet.scala
b/service/src/main/scala/org/apache/celeborn/common/metrics/sink/AbstractServlet.scala
similarity index 77%
rename from
common/src/main/scala/org/apache/celeborn/common/metrics/sink/AbstractServlet.scala
rename to
service/src/main/scala/org/apache/celeborn/common/metrics/sink/AbstractServlet.scala
index 325826164..9b9ba7e12 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/AbstractServlet.scala
+++
b/service/src/main/scala/org/apache/celeborn/common/metrics/sink/AbstractServlet.scala
@@ -16,17 +16,19 @@
*/
package org.apache.celeborn.common.metrics.sink
+import org.eclipse.jetty.servlet.ServletContextHandler
+
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.metrics.source.Source
abstract class AbstractServlet(sources: Seq[Source]) extends Sink with Logging
{
- def getHandlers(conf: CelebornConf): Array[ServletHttpRequestHandler] = {
- Array[ServletHttpRequestHandler](
- createHttpRequestHandler())
+ def getHandlers(conf: CelebornConf): Array[ServletContextHandler] = {
+ Array[ServletContextHandler](
+ createServletHandler())
}
- def createHttpRequestHandler(): ServletHttpRequestHandler
+ def createServletHandler(): ServletContextHandler
def getMetricsSnapshot: String = {
sources.map(_.getMetrics).mkString
@@ -38,11 +40,3 @@ abstract class AbstractServlet(sources: Seq[Source]) extends
Sink with Logging {
override def report(): Unit = {}
}
-
-abstract class ServletHttpRequestHandler(path: String) extends Logging {
-
- def handleRequest(uri: String): String
-
- def getServletPath(): String = path
-
-}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/JsonServlet.scala
b/service/src/main/scala/org/apache/celeborn/common/metrics/sink/JsonServlet.scala
similarity index 95%
rename from
common/src/main/scala/org/apache/celeborn/common/metrics/sink/JsonServlet.scala
rename to
service/src/main/scala/org/apache/celeborn/common/metrics/sink/JsonServlet.scala
index 7a2b8b52c..4ed2c5a3a 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/JsonServlet.scala
+++
b/service/src/main/scala/org/apache/celeborn/common/metrics/sink/JsonServlet.scala
@@ -24,10 +24,12 @@ import scala.collection.mutable.ArrayBuffer
import com.codahale.metrics.MetricRegistry
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.{ClassTagExtensions,
DefaultScalaModule}
-import io.netty.channel.ChannelHandler.Sharable
+import org.eclipse.jetty.servlet.ServletContextHandler
import org.apache.celeborn.common.metrics.{CelebornHistogram, CelebornTimer,
ResettableSlidingWindowReservoir}
-import org.apache.celeborn.common.metrics.source.{AbstractSource,
NamedCounter, NamedGauge, NamedHistogram, NamedTimer, Source}
+import org.apache.celeborn.common.metrics.source._
+import org.apache.celeborn.server.common.http.HttpUtils
+import org.apache.celeborn.server.common.http.HttpUtils.ServletParams
object JsonConverter {
val mapper = new ObjectMapper() with ClassTagExtensions
@@ -72,8 +74,10 @@ class JsonServlet(
}
}
- override def createHttpRequestHandler(): ServletHttpRequestHandler = {
- new JsonHttpRequestHandler(servletPath, this)
+ override def createServletHandler(): ServletContextHandler = {
+ HttpUtils.createServletHandler(
+ servletPath,
+ new ServletParams(_ => getMetricsSnapshot, "text/json"))
}
override def stop(): Unit = {}
@@ -344,12 +348,3 @@ class JsonServlet(
}
}
}
-
-@Sharable
-class JsonHttpRequestHandler(path: String, jsonServlet: JsonServlet)
- extends ServletHttpRequestHandler(path) {
-
- override def handleRequest(uri: String): String = {
- jsonServlet.getMetricsSnapshot
- }
-}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala
b/service/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala
similarity index 73%
rename from
common/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala
rename to
service/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala
index 4b4548421..27797c8fc 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala
+++
b/service/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala
@@ -20,9 +20,11 @@ package org.apache.celeborn.common.metrics.sink
import java.util.Properties
import com.codahale.metrics.MetricRegistry
-import io.netty.channel.ChannelHandler.Sharable
+import org.eclipse.jetty.servlet.ServletContextHandler
import org.apache.celeborn.common.metrics.source.Source
+import org.apache.celeborn.server.common.http.HttpUtils
+import org.apache.celeborn.server.common.http.HttpUtils.ServletParams
class PrometheusServlet(
val property: Properties,
@@ -30,17 +32,9 @@ class PrometheusServlet(
val sources: Seq[Source],
val servletPath: String) extends AbstractServlet(sources) {
- override def createHttpRequestHandler(): ServletHttpRequestHandler = {
- new PrometheusHttpRequestHandler(servletPath, this)
- }
-}
-
-@Sharable
-class PrometheusHttpRequestHandler(
- path: String,
- prometheusServlet: PrometheusServlet) extends
ServletHttpRequestHandler(path) {
-
- override def handleRequest(uri: String): String = {
- prometheusServlet.getMetricsSnapshot
+ override def createServletHandler(): ServletContextHandler = {
+ HttpUtils.createServletHandler(
+ servletPath,
+ new ServletParams(_ => getMetricsSnapshot, "text/plain"))
}
}
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
index f13752b81..18e016187 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
@@ -23,7 +23,8 @@ import scala.collection.JavaConverters._
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.server.common.http.{HttpRequestHandler, HttpServer,
HttpServerInitializer}
+import org.apache.celeborn.server.common.http.HttpServer
+import org.apache.celeborn.server.common.http.api.ApiRootResource
import org.apache.celeborn.server.common.service.config.ConfigLevel
abstract class HttpService extends Service with Logging {
@@ -173,18 +174,20 @@ abstract class HttpService extends Service with Logging {
def getWorkerEventInfo(): String = throw new UnsupportedOperationException()
def startHttpServer(): Unit = {
- val handlers =
- if (metricsSystem.running) {
- new HttpRequestHandler(this, metricsSystem.getServletHandlers)
- } else {
- new HttpRequestHandler(this, null)
- }
- httpServer = new HttpServer(
+ httpServer = HttpServer(
serviceName,
httpHost(),
httpPort(),
- new HttpServerInitializer(handlers))
+ httpMaxWorkerThreads(),
+ httpStopTimeout())
httpServer.start()
+ startInternal()
+ // block until the HTTP server is started, otherwise, we may get
+ // the wrong HTTP server port -1
+ while (httpServer.getState != "STARTED") {
+ logInfo(s"Waiting for $serviceName's HTTP server getting started")
+ Thread.sleep(1000)
+ }
}
private def httpHost(): String = {
@@ -205,6 +208,41 @@ abstract class HttpService extends Service with Logging {
}
}
+ private def httpMaxWorkerThreads(): Int = {
+ serviceName match {
+ case Service.MASTER =>
+ conf.masterHttpMaxWorkerThreads
+ case Service.WORKER =>
+ conf.workerHttpMaxWorkerThreads
+ }
+ }
+
+ private def httpStopTimeout(): Long = {
+ serviceName match {
+ case Service.MASTER =>
+ conf.masterHttpStopTimeout
+ case Service.WORKER =>
+ conf.workerHttpStopTimeout
+ }
+ }
+
+ def connectionUrl: String = {
+ httpServer.getServerUri
+ }
+
+ protected def startInternal(): Unit = {
+ httpServer.addHandler(ApiRootResource.getServletHandler(this))
+
httpServer.addStaticHandler("META-INF/resources/webjars/swagger-ui/4.9.1/",
"/swagger-static/")
+ httpServer.addStaticHandler("org/apache/celeborn/swagger", "/swagger")
+ httpServer.addRedirectHandler("/help", "/swagger")
+ httpServer.addRedirectHandler("/docs", "/swagger")
+ if (metricsSystem.running) {
+ metricsSystem.getServletContextHandlers.foreach { handler =>
+ httpServer.addHandler(handler)
+ }
+ }
+ }
+
override def initialize(): Unit = {
super.initialize()
startHttpServer()
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala
deleted file mode 100644
index 778355873..000000000
---
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.celeborn.server.common.http
-
-import org.apache.celeborn.server.common.{HttpService, Service}
-import org.apache.celeborn.server.common.service.config.ConfigLevel
-
-/**
- * HTTP endpoints of Rest API providers.
- */
-trait HttpEndpoint {
- def path: String
-
- def description(service: String): String
-
- def handle(service: HttpService, parameters: Map[String, String]): String
-}
-
-case object Conf extends HttpEndpoint {
- override def path: String = "/conf"
-
- override def description(service: String): String = s"List the conf setting
of the $service."
-
- override def handle(service: HttpService, parameters: Map[String, String]):
String =
- service.getConf
-}
-
-case object ListDynamicConfigs extends HttpEndpoint {
- override def path: String = "/listDynamicConfigs"
-
- override def description(service: String): String = s"List the dynamic
configs of the $service. " +
- s"The parameter level specifies the config level of dynamic configs. " +
- s"The parameter tenant specifies the tenant id of
${ConfigLevel.TENANT.name()} or ${ConfigLevel.TENANT_USER.name()} level. " +
- s"The parameter name specifies the user name of
${ConfigLevel.TENANT_USER.name()} level. " +
- s"Meanwhile, either none or all of the parameter tenant and name are
specified for ${ConfigLevel.TENANT_USER.name()} level."
-
- override def handle(service: HttpService, parameters: Map[String, String]):
String =
- service.getDynamicConfigs(
- parameters.getOrElse("LEVEL", "").trim,
- parameters.getOrElse("TENANT", "").trim,
- parameters.getOrElse("NAME", "").trim)
-}
-
-case object WorkerInfo extends HttpEndpoint {
- override def path: String = "/workerInfo"
-
- override def description(service: String): String = {
- if (service == Service.MASTER)
- "List worker information of the service. It will list all registered
workers 's information."
- else "List the worker information of the worker."
- }
-
- override def handle(service: HttpService, parameters: Map[String, String]):
String =
- service.getWorkerInfo
-}
-
-case object ThreadDump extends HttpEndpoint {
- override def path: String = "/threadDump"
-
- override def description(service: String): String =
- s"List the current thread dump of the $service."
-
- override def handle(service: HttpService, parameters: Map[String, String]):
String =
- service.getThreadDump
-}
-
-case object Shuffles extends HttpEndpoint {
- override def path: String = "/shuffles"
-
- override def description(service: String): String = {
- if (service == Service.MASTER)
- "List all running shuffle keys of the service. It will return all
running shuffle's key of the cluster."
- else
- "List all the running shuffle keys of the worker. It only return keys of
shuffles running in that worker."
- }
-
- override def handle(service: HttpService, parameters: Map[String, String]):
String =
- service.getShuffleList
-}
-
-case object Applications extends HttpEndpoint {
- override def path: String = "/applications"
-
- override def description(service: String): String =
- if (service == Service.MASTER)
- "List all running application's ids of the cluster."
- else
- "List all running application's ids of the worker. It only return
application ids running in that worker."
-
- override def handle(service: HttpService, parameters: Map[String, String]):
String =
- service.getApplicationList
-}
-
-case object ListTopDiskUsedApps extends HttpEndpoint {
- override def path: String = "/listTopDiskUsedApps"
-
- override def description(service: String): String = {
- if (service == Service.MASTER)
- "List the top disk usage application ids. It will return the top disk
usage application ids for the cluster."
- else
- "List the top disk usage application ids. It only return application ids
running in that worker."
- }
-
- override def handle(service: HttpService, parameters: Map[String, String]):
String =
- service.listTopDiskUseApps
-}
-
-case object Help extends HttpEndpoint {
- override def path: String = "/help"
-
- override def description(service: String): String =
- s"List the available API providers of the $service."
-
- override def handle(service: HttpService, parameters: Map[String, String]):
String =
- HttpUtils.help(service.serviceName)
-}
-
-case object Invalid extends HttpEndpoint {
-
- val invalid = "invalid"
-
- override def path: String = None.toString
-
- override def description(service: String): String = s"Invalid uri of the
$service."
-
- override def handle(service: HttpService, parameters: Map[String, String]):
String = invalid
-}
-
-case object MasterGroupInfo extends HttpEndpoint {
- override def path: String = "/masterGroupInfo"
-
- override def description(service: String): String =
- "List master group information of the service. It will list all master's
LEADER, FOLLOWER information."
-
- override def handle(service: HttpService, parameters: Map[String, String]):
String =
- service.getMasterGroupInfo
-}
-
-case object LostWorkers extends HttpEndpoint {
- override def path: String = "/lostWorkers"
-
- override def description(service: String): String = "List all lost workers
of the master."
-
- override def handle(service: HttpService, parameters: Map[String, String]):
String =
- service.getLostWorkers
-}
-
-case object ExcludedWorkers extends HttpEndpoint {
- override def path: String = "/excludedWorkers"
-
- override def description(service: String): String = "List all excluded
workers of the master."
-
- override def handle(service: HttpService, parameters: Map[String, String]):
String =
- service.getExcludedWorkers
-}
-
-case object ShutdownWorkers extends HttpEndpoint {
- override def path: String = "/shutdownWorkers"
-
- override def description(service: String): String = "List all shutdown
workers of the master."
-
- override def handle(service: HttpService, parameters: Map[String, String]):
String =
- service.getShutdownWorkers
-}
-
-case object Hostnames extends HttpEndpoint {
- override def path: String = "/hostnames"
-
- override def description(service: String): String =
- "List all running application's LifecycleManager's hostnames of the
cluster."
-
- override def handle(service: HttpService, parameters: Map[String, String]):
String =
- service.getHostnameList
-}
-
-case object Exclude extends HttpEndpoint {
- override def path: String = "/exclude"
-
- override def description(service: String): String =
- "Excluded workers of the master add or remove the worker manually given
worker id. The parameter add or remove specifies the excluded workers to add or
remove, which value is separated by commas."
-
- override def handle(service: HttpService, parameters: Map[String, String]):
String =
- service.exclude(parameters.getOrElse("ADD", "").trim,
parameters.getOrElse("REMOVE", "").trim)
-}
-
-case object ListPartitionLocationInfo extends HttpEndpoint {
- override def path: String = "/listPartitionLocationInfo"
-
- override def description(service: String): String =
- "List all the living PartitionLocation information in that worker."
-
- override def handle(service: HttpService, parameters: Map[String, String]):
String =
- service.listPartitionLocationInfo
-}
-
-case object UnavailablePeers extends HttpEndpoint {
- override def path: String = "/unavailablePeers"
-
- override def description(service: String): String =
- "List the unavailable peers of the worker, this always means the worker
connect to the peer failed."
-
- override def handle(service: HttpService, parameters: Map[String, String]):
String =
- service.getUnavailablePeers
-}
-
-case object IsShutdown extends HttpEndpoint {
- override def path: String = "/isShutdown"
-
- override def description(service: String): String =
- "Show if the worker is during the process of shutdown."
-
- override def handle(service: HttpService, parameters: Map[String, String]):
String =
- service.isShutdown
-}
-
-case object IsRegistered extends HttpEndpoint {
- override def path: String = "/isRegistered"
-
- override def description(service: String): String =
- "Show if the worker is registered to the master success."
-
- override def handle(service: HttpService, parameters: Map[String, String]):
String =
- service.isRegistered
-}
-
-case object Exit extends HttpEndpoint {
- override def path: String = "/exit"
-
- override def description(service: String): String =
- "Trigger this worker to exit. Legal types are 'DECOMMISSION', 'GRACEFUL'
and 'IMMEDIATELY'."
-
- override def handle(service: HttpService, parameters: Map[String, String]):
String =
- service.exit(parameters.getOrElse("TYPE", ""))
-}
-
-case object SendWorkerEvent extends HttpEndpoint {
- override def path: String = "/sendWorkerEvent"
-
- override def description(service: String): String =
- "For Master(Leader) can send worker event to manager workers. Legal types
are 'None', 'Immediately', 'Decommission', 'DecommissionThenIdle', 'Graceful',
'Recommission'"
-
- override def handle(service: HttpService, parameters: Map[String, String]):
String =
- service.handleWorkerEvent(parameters.getOrElse("TYPE", ""),
parameters.getOrElse("WORKERS", ""))
-}
-
-case object WorkerEventInfo extends HttpEndpoint {
- override def path: String = "/workerEventInfo"
-
- override def description(service: String): String =
- "List all worker event infos of the master."
-
- override def handle(service: HttpService, parameters: Map[String, String]):
String =
- service.getWorkerEventInfo()
-}
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
deleted file mode 100644
index 531f19454..000000000
---
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.celeborn.server.common.http
-
-import io.netty.buffer.Unpooled
-import io.netty.channel.{ChannelFutureListener, ChannelHandlerContext,
SimpleChannelInboundHandler}
-import io.netty.channel.ChannelHandler.Sharable
-import io.netty.handler.codec.http._
-import io.netty.util.CharsetUtil
-
-import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.common.metrics.sink.{JsonHttpRequestHandler,
ServletHttpRequestHandler}
-import org.apache.celeborn.server.common.HttpService
-
-/**
- * A handler for the REST API that defines how to handle the HTTP request
given a message.
- *
- * @param service The service of HTTP server.
- * @param uri The uri of HTTP request.
- */
-@Sharable
-class HttpRequestHandler(
- service: HttpService,
- servletHttpRequestHandlers: Array[ServletHttpRequestHandler])
- extends SimpleChannelInboundHandler[FullHttpRequest] with Logging {
-
- override def channelReadComplete(ctx: ChannelHandlerContext): Unit = {
- ctx.flush()
- }
-
- override def channelRead0(ctx: ChannelHandlerContext, req: FullHttpRequest):
Unit = {
- val uri = req.uri()
- val (path, parameters) = HttpUtils.parseUri(uri)
- val msg = HttpUtils.handleRequest(service, path, parameters)
- val textType = "text/plain; charset=UTF-8"
- val jsonType = "application/json"
- val (response, contentType) = msg match {
- case Invalid.invalid =>
- if (servletHttpRequestHandlers != null) {
- servletHttpRequestHandlers.find(servlet =>
- uri == servlet.getServletPath()).map {
- case jsonHandler: JsonHttpRequestHandler =>
- (jsonHandler.handleRequest(uri), jsonType)
- case handler: ServletHttpRequestHandler =>
- (handler.handleRequest(uri), textType)
- }.getOrElse((s"Unknown path $uri!", textType))
- } else {
- (
- s"${Invalid.description(service.serviceName)}
${HttpUtils.help(service.serviceName)}",
- textType)
- }
- case _ => (msg, textType)
- }
-
- val res = new DefaultFullHttpResponse(
- HttpVersion.HTTP_1_1,
- HttpResponseStatus.OK,
- Unpooled.copiedBuffer(response, CharsetUtil.UTF_8))
- res.headers().set(HttpHeaderNames.CONTENT_TYPE, contentType)
- ctx.writeAndFlush(res).addListener(ChannelFutureListener.CLOSE)
- }
-}
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
index a45ff109a..336132576 100644
---
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
@@ -17,75 +17,108 @@
package org.apache.celeborn.server.common.http
-import java.net.InetSocketAddress
-import java.util.concurrent.TimeUnit
-
-import io.netty.bootstrap.ServerBootstrap
-import io.netty.channel.{ChannelFuture, ChannelInitializer}
-import io.netty.channel.socket.nio.NioServerSocketChannel
-import io.netty.handler.logging.{LoggingHandler, LogLevel}
+import org.apache.commons.lang3.SystemUtils
+import org.eclipse.jetty.server.{Handler, HttpConfiguration,
HttpConnectionFactory, Server, ServerConnector}
+import org.eclipse.jetty.server.handler.{ContextHandlerCollection,
ErrorHandler}
+import org.eclipse.jetty.util.component.LifeCycle
+import org.eclipse.jetty.util.thread.{QueuedThreadPool,
ScheduledExecutorScheduler}
import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.common.network.util.{IOMode, NettyUtils}
-import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
+import org.apache.celeborn.common.util.CelebornExitKind
-class HttpServer(
+private[celeborn] case class HttpServer(
role: String,
- host: String,
- port: Int,
- channelInitializer: ChannelInitializer[_]) extends Logging {
+ server: Server,
+ connector: ServerConnector,
+ rootHandler: ContextHandlerCollection) extends Logging {
- private var bootstrap: ServerBootstrap = _
- private var bindFuture: ChannelFuture = _
@volatile private var isStarted = false
@throws[Exception]
def start(): Unit = synchronized {
- val boss = NettyUtils.createEventLoop(IOMode.NIO, 1, role + "-http-boss")
- val worker = NettyUtils.createEventLoop(IOMode.NIO, 2, role +
"-http-worker")
- bootstrap = new ServerBootstrap
- bootstrap
- .group(boss, worker)
- .handler(new LoggingHandler(LogLevel.DEBUG))
- .channel(classOf[NioServerSocketChannel])
- .childHandler(channelInitializer)
-
- val address = new InetSocketAddress(host, port)
- bindFuture = bootstrap.bind(address).sync
- bindFuture.syncUninterruptibly()
- logInfo(s"$role: HttpServer started on ${address.getHostString}:$port.")
- isStarted = true
+ try {
+ server.start()
+ connector.start()
+ server.addConnector(connector)
+ logInfo(s"$role: HttpServer started on
${connector.getHost}:${connector.getPort}.")
+ isStarted = true
+ } catch {
+ case e: Exception =>
+ stop(CelebornExitKind.EXIT_IMMEDIATELY)
+ throw e
+ }
}
def stop(exitCode: Int): Unit = synchronized {
if (isStarted) {
- logInfo(s"$role: Stopping HttpServer")
- if (bindFuture != null) {
- // close is a local operation and should finish within milliseconds;
timeout just to be safe
- bindFuture.channel.close.awaitUninterruptibly(10, TimeUnit.SECONDS)
- bindFuture = null
+ if (exitCode == CelebornExitKind.EXIT_IMMEDIATELY) {
+ server.setStopTimeout(0)
}
- if (bootstrap != null && bootstrap.config.group != null) {
- Utils.tryLogNonFatalError {
- if (exitCode == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
- bootstrap.config.group.shutdownGracefully(3, 5, TimeUnit.SECONDS)
- } else {
- bootstrap.config.group.shutdownGracefully(0, 0, TimeUnit.SECONDS)
- }
- }
- }
- if (bootstrap != null && bootstrap.config.childGroup != null) {
- Utils.tryLogNonFatalError {
- if (exitCode == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
- bootstrap.config.childGroup.shutdownGracefully(3, 5,
TimeUnit.SECONDS)
- } else {
- bootstrap.config.childGroup.shutdownGracefully(0, 0,
TimeUnit.SECONDS)
- }
- }
+ logInfo(s"$role: Stopping HttpServer")
+ server.stop()
+ connector.stop()
+ server.getThreadPool match {
+ case lifeCycle: LifeCycle => lifeCycle.stop()
+ case _ =>
}
- bootstrap = null
logInfo(s"$role: HttpServer stopped.")
isStarted = false
}
}
+ def getServerUri: String = connector.getHost + ":" + connector.getLocalPort
+
+ def addHandler(handler: Handler): Unit = synchronized {
+ rootHandler.addHandler(handler)
+ if (!handler.isStarted) handler.start()
+ }
+
+ def addStaticHandler(
+ resourceBase: String,
+ contextPath: String): Unit = {
+ addHandler(HttpUtils.createStaticHandler(resourceBase, contextPath))
+ }
+
+ def addRedirectHandler(
+ src: String,
+ dest: String): Unit = {
+ addHandler(HttpUtils.createRedirectHandler(src, dest))
+ }
+
+ def getState: String = server.getState
+}
+
+object HttpServer {
+
+ def apply(role: String, host: String, port: Int, poolSize: Int, stopTimeout:
Long): HttpServer = {
+ val pool = new QueuedThreadPool(poolSize)
+ pool.setName(s"$role-JettyThreadPool")
+ pool.setDaemon(true)
+ val server = new Server(pool)
+ server.setStopTimeout(stopTimeout)
+
+ val errorHandler = new ErrorHandler()
+ errorHandler.setShowStacks(true)
+ errorHandler.setServer(server)
+ server.addBean(errorHandler)
+
+ val collection = new ContextHandlerCollection
+ server.setHandler(collection)
+
+ val serverExecutor = new
ScheduledExecutorScheduler(s"$role-JettyScheduler", true)
+ val httpConf = new HttpConfiguration()
+ val connector = new ServerConnector(
+ server,
+ null,
+ serverExecutor,
+ null,
+ -1,
+ -1,
+ new HttpConnectionFactory(httpConf))
+ connector.setHost(host)
+ connector.setPort(port)
+ connector.setReuseAddress(!SystemUtils.IS_OS_WINDOWS)
+ connector.setAcceptQueueSize(math.min(connector.getAcceptors, 8))
+
+ new HttpServer(role, server, connector, collection)
+ }
}
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala
index 4ccb127e5..b9b6a3ec2 100644
---
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala
@@ -18,78 +18,111 @@
package org.apache.celeborn.server.common.http
import java.net.URL
-import java.util.Locale
-
-import org.apache.celeborn.server.common.{HttpService, Service}
-
-object HttpUtils {
-
- private val baseEndpoints: List[HttpEndpoint] =
- List(
- Conf,
- ListDynamicConfigs,
- WorkerInfo,
- ThreadDump,
- Shuffles,
- Applications,
- ListTopDiskUsedApps,
- Help)
- private val masterEndpoints: List[HttpEndpoint] = List(
- MasterGroupInfo,
- LostWorkers,
- ExcludedWorkers,
- ShutdownWorkers,
- Hostnames,
- SendWorkerEvent,
- WorkerEventInfo,
- Exclude) ++ baseEndpoints
- private val workerEndpoints: List[HttpEndpoint] =
- List(
- ListPartitionLocationInfo,
- UnavailablePeers,
- IsShutdown,
- IsRegistered,
- Exit) ++ baseEndpoints
-
- def parseUri(uri: String): (String, Map[String, String]) = {
- val url = new URL(s"https://127.0.0.1:9000$uri")
- val parameter =
- if (url.getQuery == null) {
- Map.empty[String, String]
- } else {
- url.getQuery
- .split("&")
- .map(_.split("="))
- .map(arr => arr(0).toUpperCase(Locale.ROOT) -> arr(1)).toMap
+import javax.servlet.http.{HttpServlet, HttpServletRequest,
HttpServletResponse}
+
+import org.eclipse.jetty.servlet.{DefaultServlet, ServletContextHandler,
ServletHolder}
+
+import org.apache.celeborn.common.exception.CelebornException
+import org.apache.celeborn.common.internal.Logging
+
+private[celeborn] object HttpUtils extends Logging {
+ // Base type for a function that returns something based on an HTTP request.
Allows for
+ // implicit conversion from many types of functions to jetty Handlers.
+ type Responder[T] = HttpServletRequest => T
+
+ class ServletParams[T <: AnyRef](
+ val responder: Responder[T],
+ val contentType: String,
+ val extractFn: T => String = (in: Any) => in.toString) {}
+
+ /** Create a context handler that responds to a request with the given path
prefix */
+ def createServletHandler[T <: AnyRef](
+ path: String,
+ servletParams: ServletParams[T]): ServletContextHandler = {
+ createServletHandler(path, createServlet(servletParams))
+ }
+
+ private def createServlet[T <: AnyRef](servletParams: ServletParams[T]):
HttpServlet = {
+ new HttpServlet {
+ override def doGet(request: HttpServletRequest, response:
HttpServletResponse): Unit = {
+ try {
+
response.setContentType("%s;charset=utf-8".format(servletParams.contentType))
+ response.setStatus(HttpServletResponse.SC_OK)
+ val result = servletParams.responder(request)
+ response.getWriter.print(servletParams.extractFn(result))
+ } catch {
+ case e: IllegalArgumentException =>
+ response.sendError(HttpServletResponse.SC_BAD_REQUEST,
e.getMessage)
+ case e: Exception =>
+ logWarning(s"GET ${request.getRequestURI} failed: $e", e)
+ throw e
+ }
}
- (url.getPath, parameter)
+
+ override protected def doTrace(req: HttpServletRequest, res:
HttpServletResponse): Unit = {
+ res.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED)
+ }
+ }
}
- def handleRequest(
- service: HttpService,
- path: String,
- parameters: Map[String, String]): String = {
- endpoints(service.serviceName).find(endpoint => endpoint.path ==
path).orElse(
- Some(Invalid)).get.handle(
- service,
- parameters)
+ /**
+ * Create a handler for serving files from a static directory
+ *
+ * @param resourceBase the resource directory contains static resource files
+ * @param contextPath the content path to set for the handler
+ * @return a static [[ServletContextHandler]]
+ */
+ def createStaticHandler(
+ resourceBase: String,
+ contextPath: String): ServletContextHandler = {
+ val contextHandler = new ServletContextHandler()
+ val holder = new ServletHolder(classOf[DefaultServlet])
+
Option(Thread.currentThread().getContextClassLoader.getResource(resourceBase))
match {
+ case Some(res) =>
+ holder.setInitParameter("resourceBase", res.toString)
+ case None =>
+ throw new CelebornException("Could not find resource path for Web UI:
" + resourceBase)
+ }
+ contextHandler.setContextPath(contextPath)
+ contextHandler.addServlet(holder, "/")
+ contextHandler
}
- def help(service: String): String = {
- val sb = new StringBuilder
- sb.append("Available API providers include:\n")
- val httpEndpoints: List[HttpEndpoint] = endpoints(service)
- val maxLength = httpEndpoints.map(_.path.length).max
- httpEndpoints.sortBy(_.path).foreach(endpoint =>
- sb.append(
- s"${endpoint.path.padTo(maxLength, " ").mkString}
${endpoint.description(service)}\n"))
- sb.toString
+ def createServletHandler(contextPath: String, servlet: HttpServlet):
ServletContextHandler = {
+ val handler = new ServletContextHandler()
+ val holder = new ServletHolder(servlet)
+ handler.setContextPath(contextPath)
+ handler.addServlet(holder, "/")
+ handler
}
- private def endpoints(service: String): List[HttpEndpoint] = {
- if (service == Service.MASTER)
- masterEndpoints
- else
- workerEndpoints
+ def createRedirectHandler(src: String, dest: String): ServletContextHandler
= {
+ val redirectedServlet = new HttpServlet {
+ private def doReq(req: HttpServletRequest, resp: HttpServletResponse):
Unit = {
+ val newURL = new URL(new URL(req.getRequestURL.toString),
dest).toString
+ resp.sendRedirect(newURL)
+ }
+ override def doGet(req: HttpServletRequest, resp: HttpServletResponse):
Unit = {
+ doReq(req, resp)
+ }
+
+ override def doPut(req: HttpServletRequest, resp: HttpServletResponse):
Unit = {
+ doReq(req, resp)
+ }
+
+ override def doPost(req: HttpServletRequest, resp: HttpServletResponse):
Unit = {
+ doReq(req, resp)
+ }
+
+ override def doDelete(req: HttpServletRequest, resp:
HttpServletResponse): Unit = {
+ doReq(req, resp)
+ }
+
+ override protected def doTrace(req: HttpServletRequest, resp:
HttpServletResponse): Unit = {
+ resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED)
+ }
+ }
+
+ createServletHandler(src, redirectedServlet)
}
}
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiBaseResource.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiBaseResource.scala
new file mode 100644
index 000000000..45a5d2796
--- /dev/null
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiBaseResource.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http.api
+
+import javax.ws.rs.{GET, Path, Produces, QueryParam}
+import javax.ws.rs.core.MediaType
+
+import io.swagger.v3.oas.annotations.media.Content
+import io.swagger.v3.oas.annotations.responses.ApiResponse
+
+@Path("/")
+private[api] class ApiBaseResource extends ApiRequestContext {
+ def service: String = httpService.serviceName
+
+ @GET
+ @Path("ping")
+ @Produces(Array(MediaType.TEXT_PLAIN))
+ def ping(): String = "pong"
+
+ @Path("/conf")
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.TEXT_PLAIN)),
+ description = "List the conf setting.")
+ @GET
+ def conf: String = httpService.getConf
+
+ @Path("/listDynamicConfigs")
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.TEXT_PLAIN)),
+ description = "List the dynamic configs. " +
+ "The parameter level specifies the config level of dynamic configs. " +
+ "The parameter tenant specifies the tenant id of TENANT or TENANT_USER
level. " +
+ "The parameter name specifies the user name of TENANT_USER level. " +
+ "Meanwhile, either none or all of the parameter tenant and name are
specified for TENANT_USER level.")
+ @GET
+ def listDynamicConfigs(
+ @QueryParam("LEVEL") level: String,
+ @QueryParam("TENANT") tenant: String,
+ @QueryParam("NAME") name: String): String = {
+ httpService.getDynamicConfigs(
+ normalizeParam(level),
+ normalizeParam(tenant),
+ normalizeParam(name))
+ }
+
+ @Path("/workerInfo")
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.TEXT_PLAIN)),
+ description =
+ "For MASTER: List worker information of the service. It will list all
registered workers 's information.\n" +
+ "For WORKER: List the worker information of the worker.")
+ @GET
+ def workerInfo(): String = {
+ httpService.getWorkerInfo
+ }
+
+ @Path("/threadDump")
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.TEXT_PLAIN)),
+ description = "List the current thread dump.")
+ @GET
+ def threadDump(): String = {
+ httpService.getThreadDump
+ }
+
+ @Path("shuffle")
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.TEXT_PLAIN)),
+ description =
+ "For MASTER: List all running shuffle keys of the service. It will
return all running shuffle's key of the cluster.\n" +
+ "For WORKER: List all the running shuffle keys of the worker. It only
return keys of shuffles running in that worker.")
+ @GET
+ def shuffles(): String = {
+ httpService.getShuffleList
+ }
+
+ @Path("applications")
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.TEXT_PLAIN)),
+ description =
+ "For MASTER: List all running application's ids of the cluster.\n" +
+ "For WORKER: List all running application's ids of the worker. It only
return application ids running in that worker.")
+ @GET
+ def applications(): String = {
+ httpService.getApplicationList
+ }
+
+ @Path("listTopDiskUsedApps")
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.TEXT_PLAIN)),
+ description =
+ "For MASTER: List the top disk usage application ids. It will return the
top disk usage application ids for the cluster.\n" +
+ "For WORKER: List the top disk usage application ids. It only return
application ids running in that worker.")
+ @GET
+ def listTopDiskUsedApps(): String = {
+ httpService.listTopDiskUseApps
+ }
+}
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiRequestContext.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiRequestContext.scala
new file mode 100644
index 000000000..2c293bf29
--- /dev/null
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiRequestContext.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http.api
+
+import javax.servlet.ServletContext
+import javax.servlet.http.HttpServletRequest
+import javax.ws.rs.core.Context
+
+import org.eclipse.jetty.server.handler.ContextHandler
+
+import org.apache.celeborn.server.common.HttpService
+
+private[celeborn] trait ApiRequestContext {
+ @Context
+ protected var servletContext: ServletContext = _
+
+ @Context
+ protected var httpRequest: HttpServletRequest = _
+
+ final protected def httpService: HttpService =
HttpServiceContext.get(servletContext)
+
+ protected def normalizeParam(param: String): String =
Option(param).map(_.trim).getOrElse("")
+}
+
+private[celeborn] object HttpServiceContext {
+ private val attribute = getClass.getCanonicalName
+
+ def set(contextHandler: ContextHandler, rs: HttpService): Unit = {
+ contextHandler.setAttribute(attribute, rs)
+ }
+
+ def get(context: ServletContext): HttpService = {
+ context.getAttribute(attribute).asInstanceOf[HttpService]
+ }
+}
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServerInitializer.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiRootResource.scala
similarity index 53%
copy from
service/src/main/scala/org/apache/celeborn/server/common/http/HttpServerInitializer.scala
copy to
service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiRootResource.scala
index 2619606b5..6d69c1b58 100644
---
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServerInitializer.scala
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiRootResource.scala
@@ -15,19 +15,22 @@
* limitations under the License.
*/
-package org.apache.celeborn.server.common.http
+package org.apache.celeborn.server.common.http.api
-import io.netty.channel.{ChannelInitializer, SimpleChannelInboundHandler}
-import io.netty.channel.socket.SocketChannel
-import io.netty.handler.codec.http.{HttpObjectAggregator, HttpServerCodec}
+import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
+import org.glassfish.jersey.server.ResourceConfig
+import org.glassfish.jersey.servlet.ServletContainer
-class HttpServerInitializer(
- handlers: SimpleChannelInboundHandler[_]) extends
ChannelInitializer[SocketChannel] {
+import org.apache.celeborn.server.common.HttpService
- override def initChannel(channel: SocketChannel): Unit = {
- val pipeline = channel.pipeline()
- pipeline.addLast(new HttpServerCodec())
- .addLast("httpAggregator", new HttpObjectAggregator(512 * 1024))
- .addLast(handlers)
+private[celeborn] object ApiRootResource {
+ def getServletHandler(rs: HttpService): ServletContextHandler = {
+ val openapiConf: ResourceConfig = new OpenAPIConfig
+ val holder = new ServletHolder(new ServletContainer(openapiConf))
+ val handler = new ServletContextHandler(ServletContextHandler.NO_SESSIONS)
+ handler.setContextPath("/")
+ HttpServiceContext.set(handler, rs)
+ handler.addServlet(holder, "/*")
+ handler
}
}
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/CelebornOpenApiResource.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/CelebornOpenApiResource.scala
new file mode 100644
index 000000000..f0e9cc40a
--- /dev/null
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/CelebornOpenApiResource.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http.api
+
+import javax.servlet.ServletConfig
+import javax.ws.rs.{GET, Path, PathParam, Produces}
+import javax.ws.rs.core.{Application, Context, HttpHeaders, MediaType,
Response, UriInfo}
+
+import scala.collection.JavaConverters._
+
+import com.fasterxml.jackson.core.util.DefaultPrettyPrinter
+import io.swagger.v3.jaxrs2.integration.JaxrsOpenApiContextBuilder
+import io.swagger.v3.jaxrs2.integration.resources.BaseOpenApiResource
+import io.swagger.v3.oas.annotations.Operation
+import io.swagger.v3.oas.integration.api.OpenApiContext
+import io.swagger.v3.oas.models.OpenAPI
+import io.swagger.v3.oas.models.info.{Info, License}
+import io.swagger.v3.oas.models.servers.Server
+import org.apache.commons.lang3.StringUtils
+
+@Path("/openapi.{type:json|yaml}")
+class CelebornOpenApiResource extends BaseOpenApiResource with
ApiRequestContext {
+ @Context
+ protected var config: ServletConfig = _
+
+ @Context
+ protected var app: Application = _
+
+ @GET
+ @Produces(Array(MediaType.APPLICATION_JSON, "application/yaml"))
+ @Operation(hidden = true)
+ def getOpenApi(
+ @Context headers: HttpHeaders,
+ @Context uriInfo: UriInfo,
+ @PathParam("type") tpe: String): Response = {
+
+ val ctxId = getContextId(config)
+ val ctx: OpenApiContext = new CelebornJaxrsOpenApiContextBuilder()
+ .servletConfig(config)
+ .application(app)
+ .resourcePackages(OpenAPIConfig.packages.toSet.asJava)
+ .configLocation(configLocation)
+ .openApiConfiguration(openApiConfiguration)
+ .ctxId(ctxId)
+ .buildContext(true)
+
+ val openApi = setCelebornOpenAPIDefinition(ctx.read())
+
+ if (StringUtils.isNotBlank(tpe) && tpe.trim().equalsIgnoreCase("yaml")) {
+ Response.status(Response.Status.OK)
+ .entity(
+ ctx.getOutputYamlMapper()
+ .writer(new DefaultPrettyPrinter())
+ .writeValueAsString(openApi))
+ .`type`("application/yaml")
+ .build()
+ } else {
+ Response.status(Response.Status.OK)
+ .entity(
+ ctx.getOutputJsonMapper
+ .writer(new DefaultPrettyPrinter())
+ .writeValueAsString(openApi))
+ .`type`(MediaType.APPLICATION_JSON_TYPE)
+ .build()
+ }
+ }
+
+ private def setCelebornOpenAPIDefinition(openApi: OpenAPI): OpenAPI = {
+ // TODO: to improve when https is enabled.
+ val apiUrl = s"http://${httpService.connectionUrl}/"
+ openApi.info(
+ new Info().title(
+ s"Apache Celeborn (Incubating) REST API Documentation")
+ .description(s"Role: ${httpService.serviceName}")
+ .license(
+ new License().name("Apache License 2.0")
+ .url("https://www.apache.org/licenses/LICENSE-2.0.txt")))
+ .servers(List(new Server().url(apiUrl)).asJava)
+ }
+}
+
+class CelebornJaxrsOpenApiContextBuilder
+ extends JaxrsOpenApiContextBuilder[CelebornJaxrsOpenApiContextBuilder]
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServerInitializer.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/CelebornScalaObjectMapper.scala
similarity index 57%
copy from
service/src/main/scala/org/apache/celeborn/server/common/http/HttpServerInitializer.scala
copy to
service/src/main/scala/org/apache/celeborn/server/common/http/api/CelebornScalaObjectMapper.scala
index 2619606b5..37375e2c1 100644
---
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServerInitializer.scala
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/CelebornScalaObjectMapper.scala
@@ -15,19 +15,15 @@
* limitations under the License.
*/
-package org.apache.celeborn.server.common.http
+package org.apache.celeborn.server.common.http.api
-import io.netty.channel.{ChannelInitializer, SimpleChannelInboundHandler}
-import io.netty.channel.socket.SocketChannel
-import io.netty.handler.codec.http.{HttpObjectAggregator, HttpServerCodec}
+import javax.ws.rs.ext.ContextResolver
-class HttpServerInitializer(
- handlers: SimpleChannelInboundHandler[_]) extends
ChannelInitializer[SocketChannel] {
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
- override def initChannel(channel: SocketChannel): Unit = {
- val pipeline = channel.pipeline()
- pipeline.addLast(new HttpServerCodec())
- .addLast("httpAggregator", new HttpObjectAggregator(512 * 1024))
- .addLast(handlers)
- }
+class CelebornScalaObjectMapper extends ContextResolver[ObjectMapper] {
+ private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
+
+ override def getContext(aClass: Class[_]): ObjectMapper = mapper
}
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServerInitializer.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/OpenAPIConfig.scala
similarity index 57%
rename from
service/src/main/scala/org/apache/celeborn/server/common/http/HttpServerInitializer.scala
rename to
service/src/main/scala/org/apache/celeborn/server/common/http/api/OpenAPIConfig.scala
index 2619606b5..07053de4c 100644
---
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServerInitializer.scala
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/OpenAPIConfig.scala
@@ -15,19 +15,19 @@
* limitations under the License.
*/
-package org.apache.celeborn.server.common.http
+package org.apache.celeborn.server.common.http.api
-import io.netty.channel.{ChannelInitializer, SimpleChannelInboundHandler}
-import io.netty.channel.socket.SocketChannel
-import io.netty.handler.codec.http.{HttpObjectAggregator, HttpServerCodec}
+import org.glassfish.jersey.server.ResourceConfig
-class HttpServerInitializer(
- handlers: SimpleChannelInboundHandler[_]) extends
ChannelInitializer[SocketChannel] {
+class OpenAPIConfig extends ResourceConfig {
+ packages(OpenAPIConfig.packages: _*)
+ register(classOf[CelebornOpenApiResource])
+ register(classOf[CelebornScalaObjectMapper])
+}
- override def initChannel(channel: SocketChannel): Unit = {
- val pipeline = channel.pipeline()
- pipeline.addLast(new HttpServerCodec())
- .addLast("httpAggregator", new HttpObjectAggregator(512 * 1024))
- .addLast(handlers)
- }
+object OpenAPIConfig {
+ val packages = Seq(
+ "org.apache.celeborn.server.common.http.api",
+ "org.apache.celeborn.service.deploy.master.http.api",
+ "org.apache.celeborn.service.deploy.worker.http.api")
}
diff --git a/service/src/test/resources/metrics-api.properties
b/service/src/test/resources/metrics-api.properties
new file mode 100644
index 000000000..b1b2e6725
--- /dev/null
+++ b/service/src/test/resources/metrics-api.properties
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+*.sink.prometheusServlet.class=org.apache.celeborn.common.metrics.sink.PrometheusServlet
+*.sink.jsonServlet.class=org.apache.celeborn.common.metrics.sink.JsonServlet
diff --git
a/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceSuite.scala
b/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceSuite.scala
new file mode 100644
index 000000000..632d06c79
--- /dev/null
+++
b/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceSuite.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http
+
+import javax.ws.rs.core.MediaType
+
+import org.apache.celeborn.common.CelebornConf
+
+abstract class ApiBaseResourceSuite extends HttpTestHelper {
+ celebornConf.set(CelebornConf.METRICS_ENABLED.key, "true")
+ .set(
+ CelebornConf.METRICS_CONF.key,
+
Thread.currentThread().getContextClassLoader.getResource("metrics-api.properties").getFile)
+
+ test("ping") {
+ val response = webTarget.path("ping").request(MediaType.TEXT_PLAIN).get()
+ assert(200 == response.getStatus)
+ assert(response.readEntity(classOf[String]) == "pong")
+ }
+
+ test("conf") {
+ val response = webTarget.path("conf").request(MediaType.TEXT_PLAIN).get()
+ assert(200 == response.getStatus)
+ }
+
+ test("listDynamicConfigs") {
+ val response = webTarget.path("listDynamicConfigs")
+ .queryParam("LEVEL", "TENANT")
+ .request(MediaType.TEXT_PLAIN)
+ .get()
+ assert(200 == response.getStatus)
+ }
+
+ test("workerInfo") {
+ val response =
webTarget.path("workerInfo").request(MediaType.TEXT_PLAIN).get()
+ assert(200 == response.getStatus)
+ }
+
+ test("threadDump") {
+ val response =
webTarget.path("threadDump").request(MediaType.TEXT_PLAIN).get()
+ assert(200 == response.getStatus)
+ }
+
+ test("shuffle") {
+ val response =
webTarget.path("shuffle").request(MediaType.TEXT_PLAIN).get()
+ assert(200 == response.getStatus)
+ }
+
+ test("applications") {
+ val response =
webTarget.path("applications").request(MediaType.TEXT_PLAIN).get()
+ assert(200 == response.getStatus)
+ }
+
+ test("listTopDiskUsedApps") {
+ val response =
webTarget.path("listTopDiskUsedApps").request(MediaType.TEXT_PLAIN).get()
+ assert(200 == response.getStatus)
+ }
+
+ test("openapi.json") {
+ val response =
webTarget.path("openapi.json").request(MediaType.APPLICATION_JSON).get()
+ assert(200 == response.getStatus)
+ assert(response.readEntity(classOf[String]).contains("/conf"))
+ }
+
+ test("swagger") {
+ Seq("swagger", "docs", "help").foreach { path =>
+ val response = webTarget.path(path).request(MediaType.TEXT_HTML).get()
+ assert(200 == response.getStatus)
+ assert(response.readEntity(classOf[String]).contains("swagger-ui"))
+ }
+ }
+
+ test("metrics/prometheus") {
+ val response =
webTarget.path("metrics/prometheus").request(MediaType.APPLICATION_JSON).get()
+ assert(200 == response.getStatus)
+
assert(response.readEntity(classOf[String]).contains("metrics_jvm_memory_heap_max_Value"))
+ }
+
+ test("metrics/json") {
+ val response =
webTarget.path("metrics/json").request(MediaType.APPLICATION_JSON).get()
+ assert(200 == response.getStatus)
+ assert(response.readEntity(classOf[String]).contains("\"name\" :
\"jvm.memory.heap.max\""))
+ }
+}
diff --git
a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpTestHelper.scala
b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpTestHelper.scala
new file mode 100644
index 000000000..8674fc39c
--- /dev/null
+++
b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpTestHelper.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http
+
+import java.net.URI
+import javax.ws.rs.client.WebTarget
+import javax.ws.rs.core.{Application, UriBuilder}
+
+import org.glassfish.jersey.client.ClientConfig
+import org.glassfish.jersey.media.multipart.MultiPartFeature
+import org.glassfish.jersey.server.ResourceConfig
+import org.glassfish.jersey.test.JerseyTest
+import org.glassfish.jersey.test.jetty.JettyTestContainerFactory
+import org.glassfish.jersey.test.spi.TestContainerFactory
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+import org.scalatest.funsuite.AnyFunSuite
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.server.common.HttpService
+import org.apache.celeborn.server.common.http.HttpTestHelper.RestApiBaseSuite
+import org.apache.celeborn.server.common.http.api.CelebornScalaObjectMapper
+
+object HttpTestHelper {
+ class RestApiBaseSuite extends JerseyTest {
+
+ override def configure: Application = new ResourceConfig(getClass)
+ .register(classOf[MultiPartFeature])
+
+ override def configureClient(config: ClientConfig): Unit = {
+ config.register(classOf[CelebornScalaObjectMapper])
+ .register(classOf[MultiPartFeature])
+ }
+
+ override def getTestContainerFactory: TestContainerFactory = new
JettyTestContainerFactory
+ }
+}
+
+trait HttpTestHelper extends AnyFunSuite
+ with BeforeAndAfterAll
+ with BeforeAndAfterEach
+ with Logging {
+
+ protected val celebornConf = new CelebornConf()
+ protected def httpService: HttpService
+
+ protected val restApiBaseSuite: JerseyTest = new RestApiBaseSuite
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ restApiBaseSuite.setUp()
+ }
+
+ override def afterAll(): Unit = {
+ restApiBaseSuite.tearDown()
+ super.afterAll()
+ }
+
+ protected lazy val baseUri: URI =
+ UriBuilder.fromUri(s"http://${httpService.connectionUrl}/").build()
+
+ protected lazy val webTarget: WebTarget =
restApiBaseSuite.client.target(baseUri)
+}
diff --git
a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
deleted file mode 100644
index cbb5d18ef..000000000
---
a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.celeborn.server.common.http
-
-import org.scalatest.funsuite.AnyFunSuite
-
-import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.server.common.Service
-
-class HttpUtilsSuite extends AnyFunSuite with Logging {
-
- def checkParseUri(
- uri: String,
- expectPath: String,
- expectParameters: Map[String, String]): Unit = {
- val (path, parameters) = HttpUtils.parseUri(uri)
- assert(path == expectPath)
- assert(parameters == expectParameters)
- }
-
- test("CELEBORN-448: Support exclude worker manually") {
- checkParseUri("/exclude", "/exclude", Map.empty)
- checkParseUri(
- "/exclude?add=localhost:1001:1002:1003:1004",
- "/exclude",
- Map("ADD" -> "localhost:1001:1002:1003:1004"))
- checkParseUri(
- "/exclude?remove=localhost:1001:1002:1003:1004",
- "/exclude",
- Map("REMOVE" -> "localhost:1001:1002:1003:1004"))
- checkParseUri(
-
"/exclude?add=localhost:1001:1002:1003:1004&remove=localhost:2001:2002:2003:2004",
- "/exclude",
- Map("ADD" -> "localhost:1001:1002:1003:1004", "REMOVE" ->
"localhost:2001:2002:2003:2004"))
- }
-
- test("CELEBORN-847: Support parse HTTP Restful API parameters") {
- checkParseUri("/exit", "/exit", Map.empty)
- checkParseUri("/exit?type=decommission", "/exit", Map("TYPE" ->
"decommission"))
- checkParseUri(
- "/exit?type=decommission&foo=A",
- "/exit",
- Map("TYPE" -> "decommission", "FOO" -> "A"))
- }
-
- test("CELEBORN-829: Improve response message of invalid HTTP request") {
- assert(HttpUtils.help(Service.MASTER) ==
- s"""Available API providers include:
- |/applications List all running application's ids of the
cluster.
- |/conf List the conf setting of the master.
- |/exclude Excluded workers of the master add or remove
the worker manually given worker id. The parameter add or remove specifies the
excluded workers to add or remove, which value is separated by commas.
- |/excludedWorkers List all excluded workers of the master.
- |/help List the available API providers of the master.
- |/hostnames List all running application's
LifecycleManager's hostnames of the cluster.
- |/listDynamicConfigs List the dynamic configs of the master. The
parameter level specifies the config level of dynamic configs. The parameter
tenant specifies the tenant id of TENANT or TENANT_USER level. The parameter
name specifies the user name of TENANT_USER level. Meanwhile, either none or
all of the parameter tenant and name are specified for TENANT_USER level.
- |/listTopDiskUsedApps List the top disk usage application ids. It
will return the top disk usage application ids for the cluster.
- |/lostWorkers List all lost workers of the master.
- |/masterGroupInfo List master group information of the service.
It will list all master's LEADER, FOLLOWER information.
- |/sendWorkerEvent For Master(Leader) can send worker event to
manager workers. Legal types are 'None', 'Immediately', 'Decommission',
'DecommissionThenIdle', 'Graceful', 'Recommission'
- |/shuffles List all running shuffle keys of the service.
It will return all running shuffle's key of the cluster.
- |/shutdownWorkers List all shutdown workers of the master.
- |/threadDump List the current thread dump of the master.
- |/workerEventInfo List all worker event infos of the master.
- |/workerInfo List worker information of the service. It will
list all registered workers 's information.
- |""".stripMargin)
- assert(HttpUtils.help(Service.WORKER) ==
- s"""Available API providers include:
- |/applications List all running application's ids of the
worker. It only return application ids running in that worker.
- |/conf List the conf setting of the worker.
- |/exit Trigger this worker to exit. Legal types
are 'DECOMMISSION', 'GRACEFUL' and 'IMMEDIATELY'.
- |/help List the available API providers of the
worker.
- |/isRegistered Show if the worker is registered to the
master success.
- |/isShutdown Show if the worker is during the process
of shutdown.
- |/listDynamicConfigs List the dynamic configs of the worker.
The parameter level specifies the config level of dynamic configs. The
parameter tenant specifies the tenant id of TENANT or TENANT_USER level. The
parameter name specifies the user name of TENANT_USER level. Meanwhile, either
none or all of the parameter tenant and name are specified for TENANT_USER
level.
- |/listPartitionLocationInfo List all the living PartitionLocation
information in that worker.
- |/listTopDiskUsedApps List the top disk usage application ids.
It only return application ids running in that worker.
- |/shuffles List all the running shuffle keys of the
worker. It only return keys of shuffles running in that worker.
- |/threadDump List the current thread dump of the
worker.
- |/unavailablePeers List the unavailable peers of the worker,
this always means the worker connect to the peer failed.
- |/workerInfo List the worker information of the worker.
- |""".stripMargin)
- }
-
- test("CELEBORN-1245: Support Master manage workers") {
- checkParseUri(
-
"/sendWorkerEvent?type=decommission&workers=localhost:1001:1002:1003:1004",
- "/sendWorkerEvent",
- Map("TYPE" -> "decommission", "WORKERS" ->
"localhost:1001:1002:1003:1004"))
- }
-
- test("CELEBORN-1056: Introduce Rest API of listing dynamic configuration") {
- checkParseUri("/listDynamicConfigs", "/listDynamicConfigs", Map.empty)
- checkParseUri(
- "/listDynamicConfigs?level=system",
- "/listDynamicConfigs",
- Map("LEVEL" -> "system"))
- checkParseUri(
- "/listDynamicConfigs?level=tenant&tenant=tenantId1",
- "/listDynamicConfigs",
- Map("LEVEL" -> "tenant", "TENANT" -> "tenantId1"))
- checkParseUri(
- "/listDynamicConfigs?level=tenant_user&tenant=tenantId1&name=user1",
- "/listDynamicConfigs",
- Map("LEVEL" -> "tenant_user", "TENANT" -> "tenantId1", "NAME" ->
"user1"))
- }
-}
diff --git a/tests/spark-it/pom.xml b/tests/spark-it/pom.xml
index 75e50221b..842d7a8ed 100644
--- a/tests/spark-it/pom.xml
+++ b/tests/spark-it/pom.xml
@@ -75,6 +75,14 @@
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.glassfish.jersey.inject</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.jersey.core</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
diff --git a/worker/pom.xml b/worker/pom.xml
index c4a7c913d..3821fe9da 100644
--- a/worker/pom.xml
+++ b/worker/pom.xml
@@ -98,6 +98,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-service_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-master_${scala.binary.version}</artifactId>
@@ -109,6 +115,16 @@
<artifactId>mockito-scala-scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.test-framework</groupId>
+ <artifactId>jersey-test-framework-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.test-framework.providers</groupId>
+ <artifactId>jersey-test-framework-provider-jetty</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResource.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResource.scala
new file mode 100644
index 000000000..8573b7152
--- /dev/null
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResource.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.http.api
+
+import javax.ws.rs.{GET, Path, POST, QueryParam}
+import javax.ws.rs.core.MediaType
+
+import io.swagger.v3.oas.annotations.media.Content
+import io.swagger.v3.oas.annotations.responses.ApiResponse
+
+import org.apache.celeborn.server.common.http.api.ApiRequestContext
+
+@Path("/")
+class ApiWorkerResource extends ApiRequestContext {
+ @Path("/listPartitionLocationInfo")
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.TEXT_PLAIN)),
+ description = "List all the living PartitionLocation information in that
worker.")
+ @GET
+ def listPartitionLocationInfo: String = httpService.listPartitionLocationInfo
+
+ @Path("/unavailablePeers")
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.TEXT_PLAIN)),
+ description =
+ "List the unavailable peers of the worker, this always means the worker
connect to the peer failed.")
+ @GET
+ def unavailablePeers: String = httpService.getUnavailablePeers
+
+ @Path("/isShutdown")
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.TEXT_PLAIN)),
+ description = "Show if the worker is during the process of shutdown.")
+ @GET
+ def isShutdown: String = httpService.isShutdown
+
+ @Path("/isRegistered")
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.TEXT_PLAIN)),
+ description = "Show if the worker is registered to the master success.")
+ @GET
+ def isRegistered: String = httpService.isRegistered
+
+ @Path("/exit")
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.TEXT_PLAIN)),
+ description =
+ "Trigger this worker to exit. Legal types are 'DECOMMISSION', 'GRACEFUL'
and 'IMMEDIATELY'.")
+ @POST
+ def exit(@QueryParam("TYPE") exitType: String): String = {
+ httpService.exit(normalizeParam(exitType))
+ }
+}
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceSuite.scala
new file mode 100644
index 000000000..cc09764d0
--- /dev/null
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceSuite.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage
+
+import javax.ws.rs.core.MediaType
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
+import org.apache.celeborn.server.common.HttpService
+import org.apache.celeborn.server.common.http.ApiBaseResourceSuite
+import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments}
+
+class ApiWorkerResourceSuite extends ApiBaseResourceSuite {
+ private var worker: Worker = _
+ override protected def httpService: HttpService = worker
+
+ override def beforeAll(): Unit = {
+ val workerArgs = new WorkerArguments(Array(), celebornConf)
+ worker = new Worker(celebornConf, workerArgs)
+ worker.metricsSystem.start()
+ worker.startHttpServer()
+ super.beforeAll()
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ worker.metricsSystem.stop()
+ worker.rpcEnv.shutdown()
+ worker.stop(CelebornExitKind.EXIT_IMMEDIATELY)
+ }
+
+ test("listPartitionLocationInfo") {
+ val response =
webTarget.path("listPartitionLocationInfo").request(MediaType.TEXT_PLAIN).get()
+ assert(200 == response.getStatus)
+ }
+
+ test("unavailablePeers") {
+ val response =
webTarget.path("unavailablePeers").request(MediaType.TEXT_PLAIN).get()
+ assert(200 == response.getStatus)
+ }
+
+ test("isShutdown") {
+ val response =
webTarget.path("isShutdown").request(MediaType.TEXT_PLAIN).get()
+ assert(200 == response.getStatus)
+ }
+
+ test("isRegistered") {
+ val response =
webTarget.path("isRegistered").request(MediaType.TEXT_PLAIN).get()
+ assert(200 == response.getStatus)
+ }
+}