This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 8b7c2b3f1 [CELEBORN-1477][FOLLOWUP] Fix api v1 response issue
8b7c2b3f1 is described below
commit 8b7c2b3f12541f16e058ac96438f14d41bc37f84
Author: Wang, Fei <[email protected]>
AuthorDate: Mon Jul 22 19:02:36 2024 -0700
[CELEBORN-1477][FOLLOWUP] Fix api v1 response issue
### What changes were proposed in this pull request?
1. Fix below api response:
- master GET /api/v1/masters
- master GET /api/v1/applications/top_disk_usages
- master&worker /api/v1/thread_dump
2. Fix typo in migration guide
3. refine the api annotation: METHOD -> PATH
4. enhance the `RestExceptionMapper`
### Why are the changes needed?
For /api/v1/masters, the `id` field is not in good format.
```
{
"groupId": "c5196f6d-2c34-3ed3-8b8a-47bede733167",
"leader": {
"id": "<ByteString4960c29e size=1 contents=\"0\">",
"address": "...:9872"
},
...
}
```
For `/api/v1/applications/top_disk_usages`, it thrown NPE, we shall filter
the null items.
```
24/07/18 21:52:38,506 WARN [master-JettyThreadPool-40] RestExceptionMapper:
Error occurs on accessing REST API.
java.lang.NullPointerException
at
org.apache.celeborn.service.deploy.master.http.api.v1.ApplicationResource.$anonfun$topDiskUsedApplications$2(ApplicationResource.scala:78)
```
For `api/v1/thread_dump`, seems need to add
`Produces(Array(MediaType.APPLICATION_JSON))`:
```
Caused by: javax.ws.rs.InternalServerErrorException: HTTP 500 Internal
Server Error
at
org.glassfish.jersey.server.internal.MappableExceptionWrapperInterceptor.aroundWriteTo(MappableExceptionWrapperInterceptor.java:65)
at
org.glassfish.jersey.message.internal.WriterInterceptorExecutor.proceed(WriterInterceptorExecutor.java:139)
at
org.glassfish.jersey.message.internal.MessageBodyFactory.writeTo(MessageBodyFactory.java:1116)
at
org.glassfish.jersey.server.ServerRuntime$Responder.writeResponse(ServerRuntime.java:649)
at
org.glassfish.jersey.server.ServerRuntime$Responder.processResponse(ServerRuntime.java:380)
at
org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:426)
at
org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:264)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
at
org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
at
org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:235)
at
org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:684)
at
org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
... 36 more
Caused by:
org.glassfish.jersey.message.internal.MessageBodyProviderNotFoundException:
MessageBodyWriter not found for media type=text/html, type=class
scala.collection.immutable.Map$Map1, genericType=class
scala.collection.immutable.Map$Map1.
at
org.glassfish.jersey.message.internal.WriterInterceptorExecutor$TerminalWriterInterceptor.aroundWriteTo(WriterInterceptorExecutor.java:224)
at
org.glassfish.jersey.message.internal.WriterInterceptorExecutor.proceed(WriterInterceptorExecutor.java:139)
at
org.glassfish.jersey.server.internal.JsonWithPaddingInterceptor.aroundWriteTo(JsonWithPaddingInterceptor.java:85)
at
org.glassfish.jersey.message.internal.WriterInterceptorExecutor.proceed(WriterInterceptorExecutor.java:139)
at
org.glassfish.jersey.server.internal.MappableExceptionWrapperInterceptor.aroundWriteTo(MappableExceptionWrapperInterceptor.java:61)
... 51 more
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Integration testing.
For `api/v1/masters`:
<img width="824" alt="image"
src="https://github.com/user-attachments/assets/c0908d05-aebc-435a-8446-038dd18fb7cd">
For master `api/v1/applications/top_disk_usages`:
<img width="559" alt="image"
src="https://github.com/user-attachments/assets/50860735-9975-449a-9f77-24d8eafd2018">
For `api/v1/thread_dump`:
<img width="1188" alt="image"
src="https://github.com/user-attachments/assets/9844de22-45c6-46ba-9260-c8a7d28c2e1d">
Closes #2637 from turboFei/fix_id_info.
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
---
docs/migration.md | 4 ++--
.../deploy/master/http/api/v1/ApplicationResource.scala | 6 +++---
.../service/deploy/master/http/api/v1/MasterResource.scala | 4 ++--
.../service/deploy/master/http/api/v1/WorkerResource.scala | 6 +++---
.../celeborn/server/common/http/api/ApiRequestContext.scala | 10 ++++++----
.../celeborn/server/common/http/api/v1/ApiV1BaseResource.scala | 6 ++++--
.../celeborn/server/common/http/api/v1/ConfResource.scala | 2 +-
.../server/common/http/api/v1/ApiV1BaseResourceSuite.scala | 6 +++++-
.../deploy/worker/http/api/v1/ApplicationResource.scala | 2 +-
.../service/deploy/worker/http/api/v1/ShuffleResource.scala | 2 +-
.../service/deploy/worker/http/api/v1/WorkerResource.scala | 2 +-
11 files changed, 29 insertions(+), 21 deletions(-)
diff --git a/docs/migration.md b/docs/migration.md
index cdbb91537..85f7716d9 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -34,7 +34,7 @@ license: |
|--------------------------|------------------------------------------|--------------------------------------------------|
| GET /conf | GET /api/v1/conf |
|
| GET /listDynamicConfigs | GET /api/v1/conf/dynamic |
|
- | GET /threadDump | GET /api/v1/conf/thread_dump |
|
+ | GET /threadDump | GET /api/v1/thread_dump |
|
| GET /applications | GET /api/v1/applications |
|
| GET /listTopDiskUsedApps | GET /api/v1/applications/top_disk_usages |
|
| GET /hostnames | GET /api/v1/applications/hostnames |
|
@@ -56,7 +56,7 @@ license: |
|--------------------------------|------------------------------------------|---------------------------------------------|
| GET /conf | GET /api/v1/conf
| |
| GET /listDynamicConfigs | GET /api/v1/conf/dynamic
| |
- | GET /threadDump | GET /api/v1/conf/thread_dump
| |
+ | GET /threadDump | GET /api/v1/thread_dump
| |
| GET /applications | GET /api/v1/applications
| |
| GET /listTopDiskUsedApps | GET
/api/v1/applications/top_disk_usages |
|
| GET /shuffle | GET /api/v1/shuffles
| |
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala
index 23b230728..7c08e335e 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala
@@ -61,8 +61,8 @@ class ApplicationResource extends ApiRequestContext {
schema = new Schema(implementation =
classOf[AppDiskUsageSnapshotsResponse]))),
description =
"List the top disk usage application ids. It will return the top disk
usage application ids for the cluster.")
- @Path("/top_disk_usages")
@GET
+ @Path("/top_disk_usages")
def topDiskUsedApplications(): AppDiskUsageSnapshotsResponse = {
new AppDiskUsageSnapshotsResponse()
.snapshots(
@@ -73,7 +73,7 @@ class ApplicationResource extends ApiRequestContext {
.end(
snapshot.endSnapShotTime)
.topNItems(
- snapshot.topNItems.map { usage =>
+ snapshot.topNItems.filter(_ != null).map { usage =>
new AppDiskUsageData()
.appId(usage.appId)
.estimatedUsage(usage.estimatedUsage)
@@ -89,8 +89,8 @@ class ApplicationResource extends ApiRequestContext {
schema = new Schema(implementation = classOf[HostnamesResponse]))),
description =
"List all running application's LifecycleManager's hostnames of the
cluster.")
- @Path("/hostnames")
@GET
+ @Path("/hostnames")
def hostnames(): HostnamesResponse = {
new
HostnamesResponse().hostnames(statusSystem.hostnameSet.asScala.toSeq.asJava)
}
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/MasterResource.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/MasterResource.scala
index 94cd25700..b5a5d11bb 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/MasterResource.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/MasterResource.scala
@@ -59,13 +59,13 @@ class MasterResource extends ApiRequestContext {
}.orNull
val masterLeader = Option(leader).map { _ =>
new MasterLeader()
- .id(leader.getId.toString)
+ .id(leader.getId.toStringUtf8)
.address(leader.getAddress)
}.orNull
val masterCommitDataList = groupInfo.getCommitInfos.asScala.map {
commitInfo =>
new MasterCommitData()
.commitIndex(commitInfo.getCommitIndex)
- .id(commitInfo.getServer.getId.toString)
+ .id(commitInfo.getServer.getId.toStringUtf8)
.address(commitInfo.getServer.getAddress)
.clientAddress(commitInfo.getServer.getClientAddress)
.startUpRole(commitInfo.getServer.getStartupRole.toString)
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/WorkerResource.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/WorkerResource.scala
index 969a28c0e..92e5a9dad 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/WorkerResource.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/WorkerResource.scala
@@ -69,8 +69,8 @@ class WorkerResource extends ApiRequestContext {
schema = new Schema(implementation = classOf[HandleResponse]))),
description =
"Excluded workers of the master add or remove the worker manually given
worker id. The parameter add or remove specifies the excluded workers to add or
remove.")
- @Path("/exclude")
@POST
+ @Path("/exclude")
def excludeWorker(request: ExcludeWorkerRequest): HandleResponse = {
val (success, msg) = httpService.exclude(
request.getAdd.asScala.map(ApiUtils.toWorkerInfo).toSeq,
@@ -85,8 +85,8 @@ class WorkerResource extends ApiRequestContext {
schema = new Schema(
implementation = classOf[WorkerEventsResponse]))),
description = "List all worker event infos of the master.")
- @Path("/events")
@GET
+ @Path("/events")
def workerEvents(): WorkerEventsResponse = {
new WorkerEventsResponse().workerEvents(
statusSystem.workerEventInfos.asScala.map { case (worker, event) =>
@@ -106,8 +106,8 @@ class WorkerResource extends ApiRequestContext {
schema = new Schema(implementation = classOf[HandleResponse]))),
description =
"For Master(Leader) can send worker event to manager workers. Legal
types are 'None', 'Immediately', 'Decommission', 'DecommissionThenIdle',
'Graceful', 'Recommission'.")
- @Path("/events")
@POST
+ @Path("/events")
def sendWorkerEvents(request: SendWorkerEventRequest): HandleResponse = {
if (request.getEventType == SendWorkerEventRequest.EventTypeEnum.NONE ||
request.getWorkers.isEmpty) {
throw new BadRequestException(
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiRequestContext.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiRequestContext.scala
index 568090cd7..10a6ca800 100644
---
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiRequestContext.scala
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiRequestContext.scala
@@ -25,6 +25,7 @@ import javax.ws.rs.ext.{ExceptionMapper, Provider}
import org.eclipse.jetty.server.handler.ContextHandler
+import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.server.common.HttpService
private[celeborn] trait ApiRequestContext {
@@ -40,18 +41,19 @@ private[celeborn] trait ApiRequestContext {
}
@Provider
-class RestExceptionMapper extends ExceptionMapper[Exception] {
+class RestExceptionMapper extends ExceptionMapper[Exception] with Logging {
override def toResponse(exception: Exception): Response = {
+ logWarning("Error occurs on accessing REST API.", exception)
exception match {
case e: WebApplicationException =>
Response.status(e.getResponse.getStatus)
- .`type`(e.getResponse.getMediaType)
- .entity(e.getMessage)
+ .`type`(MediaType.APPLICATION_JSON)
+ .entity(Map("message" -> e.getMessage))
.build()
case e =>
Response.status(Response.Status.INTERNAL_SERVER_ERROR)
.`type`(MediaType.APPLICATION_JSON)
- .entity(e.getMessage)
+ .entity(Map("message" -> e.getMessage))
.build()
}
}
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResource.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResource.scala
index b0d451403..0de003425 100644
---
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResource.scala
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResource.scala
@@ -17,7 +17,7 @@
package org.apache.celeborn.server.common.http.api.v1
-import javax.ws.rs.{GET, Path}
+import javax.ws.rs.{GET, Path, Produces}
import javax.ws.rs.core.MediaType
import scala.collection.JavaConverters._
@@ -43,6 +43,7 @@ class ApiV1BaseResource extends ApiRequestContext {
implementation = classOf[ThreadStackResponse]))),
description = "List the current thread dump.")
@GET
+ @Produces(Array(MediaType.APPLICATION_JSON))
def threadDump(): ThreadStackResponse = {
new ThreadStackResponse()
.threadStacks(Utils.getThreadDump().map { threadStack =>
@@ -51,7 +52,8 @@ class ApiV1BaseResource extends ApiRequestContext {
.threadName(threadStack.threadName)
.threadState(threadStack.threadState.toString)
.stackTrace(threadStack.stackTrace.elems.asJava)
-
.blockedByThreadId(threadStack.blockedByThreadId.getOrElse(null.asInstanceOf[Long]):
Long)
+ .blockedByThreadId(
+
threadStack.blockedByThreadId.getOrElse(null).asInstanceOf[java.lang.Long])
.blockedByLock(threadStack.blockedByLock)
.holdingLocks(threadStack.holdingLocks.asJava)
}.asJava)
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ConfResource.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ConfResource.scala
index 01b60c688..d24e66154 100644
---
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ConfResource.scala
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ConfResource.scala
@@ -65,8 +65,8 @@ private[api] class ConfResource extends ApiRequestContext {
"The parameter tenant specifies the tenant id of TENANT or TENANT_USER
level. " +
"The parameter name specifies the user name of TENANT_USER level. " +
"Meanwhile, either none or all of the parameter tenant and name are
specified for TENANT_USER level.")
- @Path("/dynamic")
@GET
+ @Path("/dynamic")
def dynamicConf(
@QueryParam("level") level: String,
@QueryParam("tenant") tenant: String,
diff --git
a/service/src/test/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResourceSuite.scala
b/service/src/test/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResourceSuite.scala
index 1f4a9a19a..274c65c13 100644
---
a/service/src/test/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResourceSuite.scala
+++
b/service/src/test/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResourceSuite.scala
@@ -21,6 +21,8 @@ import java.net.URI
import javax.servlet.http.HttpServletResponse
import javax.ws.rs.core.{MediaType, UriBuilder}
+import scala.collection.JavaConverters._
+
import org.apache.celeborn.rest.v1.model.{ConfResponse, ThreadStackResponse}
import org.apache.celeborn.server.common.http.HttpTestHelper
@@ -41,6 +43,8 @@ abstract class ApiV1BaseResourceSuite extends HttpTestHelper {
test("thread_dump") {
val response =
webTarget.path("thread_dump").request(MediaType.APPLICATION_JSON).get()
assert(HttpServletResponse.SC_OK == response.getStatus)
-
assert(!response.readEntity(classOf[ThreadStackResponse]).getThreadStacks.isEmpty)
+ val threadStacks =
response.readEntity(classOf[ThreadStackResponse]).getThreadStacks.asScala
+ assert(threadStacks.nonEmpty)
+ assert(threadStacks.exists(_.getBlockedByThreadId == null))
}
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApplicationResource.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApplicationResource.scala
index 06fa9faea..5bb15c6ed 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApplicationResource.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApplicationResource.scala
@@ -58,8 +58,8 @@ class ApplicationResource extends ApiRequestContext {
schema = new Schema(implementation = classOf[AppDiskUsagesResponse]))),
description =
"List the top disk usage application ids. It will return the top disk
usage application ids for the cluster.")
- @Path("/top_disk_usages")
@GET
+ @Path("/top_disk_usages")
def topDiskUsedApplications(): AppDiskUsagesResponse = {
new AppDiskUsagesResponse()
.appDiskUsages(
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ShuffleResource.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ShuffleResource.scala
index a62cf745c..ef43b4ca3 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ShuffleResource.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ShuffleResource.scala
@@ -58,8 +58,8 @@ class ShuffleResource extends ApiRequestContext {
schema = new Schema(
implementation = classOf[ShufflePartitionsResponse]))),
description = "List all the living shuffle PartitionLocation information
in the worker.")
- @Path("/partitions")
@GET
+ @Path("/partitions")
def partitions(): ShufflePartitionsResponse = {
new ShufflePartitionsResponse()
.primaryPartitions(
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/WorkerResource.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/WorkerResource.scala
index 31229c6b3..3748df63e 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/WorkerResource.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/WorkerResource.scala
@@ -60,8 +60,8 @@ class WorkerResource extends ApiRequestContext {
schema = new Schema(implementation =
classOf[UnAvailablePeersResponse]))),
description =
"List the unavailable peers of the worker, this always means the worker
connect to the peer failed.")
- @Path("/unavailable_peers")
@GET
+ @Path("/unavailable_peers")
def unavailablePeerWorkers(): UnAvailablePeersResponse = {
new UnAvailablePeersResponse()
.peers(