This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 8b7c2b3f1 [CELEBORN-1477][FOLLOWUP] Fix api v1 response issue
8b7c2b3f1 is described below

commit 8b7c2b3f12541f16e058ac96438f14d41bc37f84
Author: Wang, Fei <[email protected]>
AuthorDate: Mon Jul 22 19:02:36 2024 -0700

    [CELEBORN-1477][FOLLOWUP] Fix api v1 response issue
    
    ### What changes were proposed in this pull request?
    
    1. Fix below api response:
    
    - master GET /api/v1/masters
    - master GET /api/v1/applications/top_disk_usages
    - master&worker /api/v1/thread_dump
    
    2. Fix typo in migration guide
    
    3. refine the api annotation: METHOD -> PATH
    
    4. enhance the `RestExceptionMapper`
    ### Why are the changes needed?
    
    For /api/v1/masters, the `id` field is not in good format.
    ```
    {
    "groupId": "c5196f6d-2c34-3ed3-8b8a-47bede733167",
    "leader": {
    "id": "<ByteString4960c29e size=1 contents=\"0\">",
    "address": "...:9872"
    },
    ...
    }
    ```
    
    For `/api/v1/applications/top_disk_usages`, it thrown NPE, we shall filter 
the null items.
    ```
    24/07/18 21:52:38,506 WARN [master-JettyThreadPool-40] RestExceptionMapper: 
Error occurs on accessing REST API.
    java.lang.NullPointerException
            at 
org.apache.celeborn.service.deploy.master.http.api.v1.ApplicationResource.$anonfun$topDiskUsedApplications$2(ApplicationResource.scala:78)
    ```
    
    For `api/v1/thread_dump`, seems need to add 
`Produces(Array(MediaType.APPLICATION_JSON))`:
    ```
    Caused by: javax.ws.rs.InternalServerErrorException: HTTP 500 Internal 
Server Error
            at 
org.glassfish.jersey.server.internal.MappableExceptionWrapperInterceptor.aroundWriteTo(MappableExceptionWrapperInterceptor.java:65)
            at 
org.glassfish.jersey.message.internal.WriterInterceptorExecutor.proceed(WriterInterceptorExecutor.java:139)
            at 
org.glassfish.jersey.message.internal.MessageBodyFactory.writeTo(MessageBodyFactory.java:1116)
            at 
org.glassfish.jersey.server.ServerRuntime$Responder.writeResponse(ServerRuntime.java:649)
            at 
org.glassfish.jersey.server.ServerRuntime$Responder.processResponse(ServerRuntime.java:380)
            at 
org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:426)
            at 
org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:264)
            at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
            at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
            at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
            at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
            at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
            at 
org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
            at 
org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:235)
            at 
org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:684)
            at 
org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
            ... 36 more
    Caused by: 
org.glassfish.jersey.message.internal.MessageBodyProviderNotFoundException: 
MessageBodyWriter not found for media type=text/html, type=class 
scala.collection.immutable.Map$Map1, genericType=class 
scala.collection.immutable.Map$Map1.
            at 
org.glassfish.jersey.message.internal.WriterInterceptorExecutor$TerminalWriterInterceptor.aroundWriteTo(WriterInterceptorExecutor.java:224)
            at 
org.glassfish.jersey.message.internal.WriterInterceptorExecutor.proceed(WriterInterceptorExecutor.java:139)
            at 
org.glassfish.jersey.server.internal.JsonWithPaddingInterceptor.aroundWriteTo(JsonWithPaddingInterceptor.java:85)
            at 
org.glassfish.jersey.message.internal.WriterInterceptorExecutor.proceed(WriterInterceptorExecutor.java:139)
            at 
org.glassfish.jersey.server.internal.MappableExceptionWrapperInterceptor.aroundWriteTo(MappableExceptionWrapperInterceptor.java:61)
            ... 51 more
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    
    Integration testing.
    
    For `api/v1/masters`:
    <img width="824" alt="image" 
src="https://github.com/user-attachments/assets/c0908d05-aebc-435a-8446-038dd18fb7cd";>
    
    For master `api/v1/applications/top_disk_usages`:
    <img width="559" alt="image" 
src="https://github.com/user-attachments/assets/50860735-9975-449a-9f77-24d8eafd2018";>
    
    For `api/v1/thread_dump`:
    <img width="1188" alt="image" 
src="https://github.com/user-attachments/assets/9844de22-45c6-46ba-9260-c8a7d28c2e1d";>
    
    Closes #2637 from turboFei/fix_id_info.
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
---
 docs/migration.md                                              |  4 ++--
 .../deploy/master/http/api/v1/ApplicationResource.scala        |  6 +++---
 .../service/deploy/master/http/api/v1/MasterResource.scala     |  4 ++--
 .../service/deploy/master/http/api/v1/WorkerResource.scala     |  6 +++---
 .../celeborn/server/common/http/api/ApiRequestContext.scala    | 10 ++++++----
 .../celeborn/server/common/http/api/v1/ApiV1BaseResource.scala |  6 ++++--
 .../celeborn/server/common/http/api/v1/ConfResource.scala      |  2 +-
 .../server/common/http/api/v1/ApiV1BaseResourceSuite.scala     |  6 +++++-
 .../deploy/worker/http/api/v1/ApplicationResource.scala        |  2 +-
 .../service/deploy/worker/http/api/v1/ShuffleResource.scala    |  2 +-
 .../service/deploy/worker/http/api/v1/WorkerResource.scala     |  2 +-
 11 files changed, 29 insertions(+), 21 deletions(-)

diff --git a/docs/migration.md b/docs/migration.md
index cdbb91537..85f7716d9 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -34,7 +34,7 @@ license: |
     
|--------------------------|------------------------------------------|--------------------------------------------------|
     | GET /conf                | GET /api/v1/conf                         |    
                                              |
     | GET /listDynamicConfigs  | GET /api/v1/conf/dynamic                 |    
                                              |
-    | GET /threadDump          | GET /api/v1/conf/thread_dump             |    
                                              |
+    | GET /threadDump          | GET /api/v1/thread_dump                  |    
                                              |
     | GET /applications        | GET /api/v1/applications                 |    
                                              |
     | GET /listTopDiskUsedApps | GET /api/v1/applications/top_disk_usages |    
                                              |
     | GET /hostnames           | GET /api/v1/applications/hostnames       |    
                                              |
@@ -56,7 +56,7 @@ license: |
     
|--------------------------------|------------------------------------------|---------------------------------------------|
     | GET /conf                      | GET /api/v1/conf                        
 |                                             |
     | GET /listDynamicConfigs        | GET /api/v1/conf/dynamic                
 |                                             |
-    | GET /threadDump                | GET /api/v1/conf/thread_dump            
 |                                             |
+    | GET /threadDump                | GET /api/v1/thread_dump                 
 |                                             |
     | GET /applications              | GET /api/v1/applications                
 |                                             |
     | GET /listTopDiskUsedApps       | GET 
/api/v1/applications/top_disk_usages |                                          
   |
     | GET /shuffle                   | GET /api/v1/shuffles                    
 |                                             |
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala
index 23b230728..7c08e335e 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala
@@ -61,8 +61,8 @@ class ApplicationResource extends ApiRequestContext {
       schema = new Schema(implementation = 
classOf[AppDiskUsageSnapshotsResponse]))),
     description =
       "List the top disk usage application ids. It will return the top disk 
usage application ids for the cluster.")
-  @Path("/top_disk_usages")
   @GET
+  @Path("/top_disk_usages")
   def topDiskUsedApplications(): AppDiskUsageSnapshotsResponse = {
     new AppDiskUsageSnapshotsResponse()
       .snapshots(
@@ -73,7 +73,7 @@ class ApplicationResource extends ApiRequestContext {
             .end(
               snapshot.endSnapShotTime)
             .topNItems(
-              snapshot.topNItems.map { usage =>
+              snapshot.topNItems.filter(_ != null).map { usage =>
                 new AppDiskUsageData()
                   .appId(usage.appId)
                   .estimatedUsage(usage.estimatedUsage)
@@ -89,8 +89,8 @@ class ApplicationResource extends ApiRequestContext {
       schema = new Schema(implementation = classOf[HostnamesResponse]))),
     description =
       "List all running application's LifecycleManager's hostnames of the 
cluster.")
-  @Path("/hostnames")
   @GET
+  @Path("/hostnames")
   def hostnames(): HostnamesResponse = {
     new 
HostnamesResponse().hostnames(statusSystem.hostnameSet.asScala.toSeq.asJava)
   }
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/MasterResource.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/MasterResource.scala
index 94cd25700..b5a5d11bb 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/MasterResource.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/MasterResource.scala
@@ -59,13 +59,13 @@ class MasterResource extends ApiRequestContext {
       }.orNull
       val masterLeader = Option(leader).map { _ =>
         new MasterLeader()
-          .id(leader.getId.toString)
+          .id(leader.getId.toStringUtf8)
           .address(leader.getAddress)
       }.orNull
       val masterCommitDataList = groupInfo.getCommitInfos.asScala.map { 
commitInfo =>
         new MasterCommitData()
           .commitIndex(commitInfo.getCommitIndex)
-          .id(commitInfo.getServer.getId.toString)
+          .id(commitInfo.getServer.getId.toStringUtf8)
           .address(commitInfo.getServer.getAddress)
           .clientAddress(commitInfo.getServer.getClientAddress)
           .startUpRole(commitInfo.getServer.getStartupRole.toString)
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/WorkerResource.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/WorkerResource.scala
index 969a28c0e..92e5a9dad 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/WorkerResource.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/WorkerResource.scala
@@ -69,8 +69,8 @@ class WorkerResource extends ApiRequestContext {
       schema = new Schema(implementation = classOf[HandleResponse]))),
     description =
       "Excluded workers of the master add or remove the worker manually given 
worker id. The parameter add or remove specifies the excluded workers to add or 
remove.")
-  @Path("/exclude")
   @POST
+  @Path("/exclude")
   def excludeWorker(request: ExcludeWorkerRequest): HandleResponse = {
     val (success, msg) = httpService.exclude(
       request.getAdd.asScala.map(ApiUtils.toWorkerInfo).toSeq,
@@ -85,8 +85,8 @@ class WorkerResource extends ApiRequestContext {
       schema = new Schema(
         implementation = classOf[WorkerEventsResponse]))),
     description = "List all worker event infos of the master.")
-  @Path("/events")
   @GET
+  @Path("/events")
   def workerEvents(): WorkerEventsResponse = {
     new WorkerEventsResponse().workerEvents(
       statusSystem.workerEventInfos.asScala.map { case (worker, event) =>
@@ -106,8 +106,8 @@ class WorkerResource extends ApiRequestContext {
       schema = new Schema(implementation = classOf[HandleResponse]))),
     description =
       "For Master(Leader) can send worker event to manager workers. Legal 
types are 'None', 'Immediately', 'Decommission', 'DecommissionThenIdle', 
'Graceful', 'Recommission'.")
-  @Path("/events")
   @POST
+  @Path("/events")
   def sendWorkerEvents(request: SendWorkerEventRequest): HandleResponse = {
     if (request.getEventType == SendWorkerEventRequest.EventTypeEnum.NONE || 
request.getWorkers.isEmpty) {
       throw new BadRequestException(
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiRequestContext.scala
 
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiRequestContext.scala
index 568090cd7..10a6ca800 100644
--- 
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiRequestContext.scala
+++ 
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiRequestContext.scala
@@ -25,6 +25,7 @@ import javax.ws.rs.ext.{ExceptionMapper, Provider}
 
 import org.eclipse.jetty.server.handler.ContextHandler
 
+import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.server.common.HttpService
 
 private[celeborn] trait ApiRequestContext {
@@ -40,18 +41,19 @@ private[celeborn] trait ApiRequestContext {
 }
 
 @Provider
-class RestExceptionMapper extends ExceptionMapper[Exception] {
+class RestExceptionMapper extends ExceptionMapper[Exception] with Logging {
   override def toResponse(exception: Exception): Response = {
+    logWarning("Error occurs on accessing REST API.", exception)
     exception match {
       case e: WebApplicationException =>
         Response.status(e.getResponse.getStatus)
-          .`type`(e.getResponse.getMediaType)
-          .entity(e.getMessage)
+          .`type`(MediaType.APPLICATION_JSON)
+          .entity(Map("message" -> e.getMessage))
           .build()
       case e =>
         Response.status(Response.Status.INTERNAL_SERVER_ERROR)
           .`type`(MediaType.APPLICATION_JSON)
-          .entity(e.getMessage)
+          .entity(Map("message" -> e.getMessage))
           .build()
     }
   }
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResource.scala
 
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResource.scala
index b0d451403..0de003425 100644
--- 
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResource.scala
+++ 
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResource.scala
@@ -17,7 +17,7 @@
 
 package org.apache.celeborn.server.common.http.api.v1
 
-import javax.ws.rs.{GET, Path}
+import javax.ws.rs.{GET, Path, Produces}
 import javax.ws.rs.core.MediaType
 
 import scala.collection.JavaConverters._
@@ -43,6 +43,7 @@ class ApiV1BaseResource extends ApiRequestContext {
         implementation = classOf[ThreadStackResponse]))),
     description = "List the current thread dump.")
   @GET
+  @Produces(Array(MediaType.APPLICATION_JSON))
   def threadDump(): ThreadStackResponse = {
     new ThreadStackResponse()
       .threadStacks(Utils.getThreadDump().map { threadStack =>
@@ -51,7 +52,8 @@ class ApiV1BaseResource extends ApiRequestContext {
           .threadName(threadStack.threadName)
           .threadState(threadStack.threadState.toString)
           .stackTrace(threadStack.stackTrace.elems.asJava)
-          
.blockedByThreadId(threadStack.blockedByThreadId.getOrElse(null.asInstanceOf[Long]):
 Long)
+          .blockedByThreadId(
+            
threadStack.blockedByThreadId.getOrElse(null).asInstanceOf[java.lang.Long])
           .blockedByLock(threadStack.blockedByLock)
           .holdingLocks(threadStack.holdingLocks.asJava)
       }.asJava)
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ConfResource.scala
 
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ConfResource.scala
index 01b60c688..d24e66154 100644
--- 
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ConfResource.scala
+++ 
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ConfResource.scala
@@ -65,8 +65,8 @@ private[api] class ConfResource extends ApiRequestContext {
       "The parameter tenant specifies the tenant id of TENANT or TENANT_USER 
level. " +
       "The parameter name specifies the user name of TENANT_USER level. " +
       "Meanwhile, either none or all of the parameter tenant and name are 
specified for TENANT_USER level.")
-  @Path("/dynamic")
   @GET
+  @Path("/dynamic")
   def dynamicConf(
       @QueryParam("level") level: String,
       @QueryParam("tenant") tenant: String,
diff --git 
a/service/src/test/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResourceSuite.scala
 
b/service/src/test/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResourceSuite.scala
index 1f4a9a19a..274c65c13 100644
--- 
a/service/src/test/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResourceSuite.scala
+++ 
b/service/src/test/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResourceSuite.scala
@@ -21,6 +21,8 @@ import java.net.URI
 import javax.servlet.http.HttpServletResponse
 import javax.ws.rs.core.{MediaType, UriBuilder}
 
+import scala.collection.JavaConverters._
+
 import org.apache.celeborn.rest.v1.model.{ConfResponse, ThreadStackResponse}
 import org.apache.celeborn.server.common.http.HttpTestHelper
 
@@ -41,6 +43,8 @@ abstract class ApiV1BaseResourceSuite extends HttpTestHelper {
   test("thread_dump") {
     val response = 
webTarget.path("thread_dump").request(MediaType.APPLICATION_JSON).get()
     assert(HttpServletResponse.SC_OK == response.getStatus)
-    
assert(!response.readEntity(classOf[ThreadStackResponse]).getThreadStacks.isEmpty)
+    val threadStacks = 
response.readEntity(classOf[ThreadStackResponse]).getThreadStacks.asScala
+    assert(threadStacks.nonEmpty)
+    assert(threadStacks.exists(_.getBlockedByThreadId == null))
   }
 }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApplicationResource.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApplicationResource.scala
index 06fa9faea..5bb15c6ed 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApplicationResource.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApplicationResource.scala
@@ -58,8 +58,8 @@ class ApplicationResource extends ApiRequestContext {
       schema = new Schema(implementation = classOf[AppDiskUsagesResponse]))),
     description =
       "List the top disk usage application ids. It will return the top disk 
usage application ids for the cluster.")
-  @Path("/top_disk_usages")
   @GET
+  @Path("/top_disk_usages")
   def topDiskUsedApplications(): AppDiskUsagesResponse = {
     new AppDiskUsagesResponse()
       .appDiskUsages(
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ShuffleResource.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ShuffleResource.scala
index a62cf745c..ef43b4ca3 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ShuffleResource.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ShuffleResource.scala
@@ -58,8 +58,8 @@ class ShuffleResource extends ApiRequestContext {
       schema = new Schema(
         implementation = classOf[ShufflePartitionsResponse]))),
     description = "List all the living shuffle PartitionLocation information 
in the worker.")
-  @Path("/partitions")
   @GET
+  @Path("/partitions")
   def partitions(): ShufflePartitionsResponse = {
     new ShufflePartitionsResponse()
       .primaryPartitions(
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/WorkerResource.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/WorkerResource.scala
index 31229c6b3..3748df63e 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/WorkerResource.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/WorkerResource.scala
@@ -60,8 +60,8 @@ class WorkerResource extends ApiRequestContext {
       schema = new Schema(implementation = 
classOf[UnAvailablePeersResponse]))),
     description =
       "List the unavailable peers of the worker, this always means the worker 
connect to the peer failed.")
-  @Path("/unavailable_peers")
   @GET
+  @Path("/unavailable_peers")
   def unavailablePeerWorkers(): UnAvailablePeersResponse = {
     new UnAvailablePeersResponse()
       .peers(

Reply via email to