This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new e17b24fd [#999] improvement: apply REST principles in the urls of the 
http interfaces (#1000)
e17b24fd is described below

commit e17b24fd0a9e07c23aba60acb3d61089c1378842
Author: xianjingfeng <[email protected]>
AuthorDate: Mon Jul 17 15:15:23 2023 +0800

    [#999] improvement: apply REST principles in the urls of the http 
interfaces (#1000)
    
    ### What changes were proposed in this pull request?
    
    Apply REST principles in the urls of the http interfaces
    
    ### Why are the changes needed?
    Fix: #999
    
    ### Does this PR introduce _any_ user-facing change?
    
    Env:
    Server IP: 127.0.0.1
    HTTP port: 19998
    RPC port: 19999
    #### Decommission single shuffle server example:
    ```
    curl -XPOST -H "Content-type:application/json" 
"http://127.0.0.1:19998/api/server/127.0.0.1-19999/decommission";
    ```
    ####  Cancel decommission single shuffle server example:
    ```
    curl -XPOST -H "Content-type:application/json" 
"http://127.0.0.1:19998/api/server/127.0.0.1-19999/cancelDecommission";
    ```
    ####  Fetch single shuffle server:
    ```
    curl -X GET http://127.0.0.1:19998/api/server/127.0.0.1-19999
    ```
    #### Fetch server list:
    ```
    #path: /api/server/nodes/[{status}]
    curl  "http://127.0.0.1:19998/api/server/nodes";
    curl  "http://127.0.0.1:19998/api/server/nodes/DECOMMISSIONING";
    curl  "http://127.0.0.1:19998/api/server/nodes/ACTIVE";
    ```
    ### How was this patch tested?
    
    UT
    
    Co-authored-by: xianjingfeng <[email protected]>
---
 .../apache/uniffle/common/metrics/TestUtils.java   | 10 ++-
 .../coordinator/web/resource/AdminResource.java    | 19 +++---
 .../coordinator/web/resource/BaseResource.java     | 32 ++++++++++
 .../coordinator/web/resource/ServerResource.java   | 72 ++++++++++++---------
 docs/coordinator_guide.md                          | 64 +++++++++++++++++--
 .../java/org/apache/uniffle/test/ServletTest.java  | 73 ++++++++++++++++++----
 6 files changed, 216 insertions(+), 54 deletions(-)

diff --git 
a/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java 
b/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java
index f2511eac..c08d09e6 100644
--- a/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java
+++ b/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java
@@ -42,6 +42,10 @@ public class TestUtils {
     return content.toString();
   }
 
+  public static String httpPost(String urlString) throws IOException {
+    return httpPost(urlString, null);
+  }
+
   public static String httpPost(String urlString, String postData) throws 
IOException {
     URL url = new URL(urlString);
     HttpURLConnection con = (HttpURLConnection) url.openConnection();
@@ -49,8 +53,10 @@ public class TestUtils {
     con.setRequestMethod("POST");
     con.setRequestProperty("Content-type", "application/json");
     StringBuilder content = new StringBuilder();
-    try (OutputStream outputStream = con.getOutputStream(); ) {
-      outputStream.write(postData.getBytes());
+    try (OutputStream outputStream = con.getOutputStream()) {
+      if (postData != null) {
+        outputStream.write(postData.getBytes());
+      }
       try (BufferedReader in = new BufferedReader(new 
InputStreamReader(con.getInputStream())); ) {
         String inputLine;
         while ((inputLine = in.readLine()) != null) {
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/AdminResource.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/AdminResource.java
index c235565a..d895c1df 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/AdminResource.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/AdminResource.java
@@ -19,7 +19,6 @@ package org.apache.uniffle.coordinator.web.resource;
 
 import java.util.List;
 import javax.servlet.ServletContext;
-import javax.servlet.http.HttpServletRequest;
 
 import org.apache.hbase.thirdparty.javax.ws.rs.GET;
 import org.apache.hbase.thirdparty.javax.ws.rs.Path;
@@ -35,20 +34,22 @@ import 
org.apache.uniffle.coordinator.access.checker.AccessChecker;
 import org.apache.uniffle.coordinator.web.Response;
 
 @Produces({MediaType.APPLICATION_JSON})
-public class AdminResource {
+public class AdminResource extends BaseResource {
   private static final Logger LOG = 
LoggerFactory.getLogger(AdminResource.class);
-  @Context private HttpServletRequest httpRequest;
   @Context protected ServletContext servletContext;
 
   @GET
   @Path("/refreshChecker")
   public Response<List<ServerNode>> refreshChecker() {
-    List<AccessChecker> accessCheckers = 
getAccessManager().getAccessCheckers();
-    LOG.info(
-        "The access checker {} has been refreshed, you can add the checker via 
rss.coordinator.access.checkers.",
-        accessCheckers);
-    accessCheckers.forEach(AccessChecker::refreshAccessChecker);
-    return Response.success(null);
+    return execute(
+        () -> {
+          List<AccessChecker> accessCheckers = 
getAccessManager().getAccessCheckers();
+          LOG.info(
+              "The access checker {} has been refreshed, you can add the 
checker via rss.coordinator.access.checkers.",
+              accessCheckers);
+          accessCheckers.forEach(AccessChecker::refreshAccessChecker);
+          return null;
+        });
   }
 
   private AccessManager getAccessManager() {
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/BaseResource.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/BaseResource.java
new file mode 100644
index 00000000..861f91ba
--- /dev/null
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/BaseResource.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator.web.resource;
+
+import java.util.concurrent.Callable;
+
+import org.apache.uniffle.coordinator.web.Response;
+
+public abstract class BaseResource {
+  protected <T> Response<T> execute(Callable<T> callable) {
+    try {
+      return Response.success(callable.call());
+    } catch (Throwable e) {
+      return Response.fail(e.getMessage());
+    }
+  }
+}
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ServerResource.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ServerResource.java
index 6899b383..6d58e7b6 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ServerResource.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ServerResource.java
@@ -27,6 +27,7 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.hbase.thirdparty.javax.ws.rs.GET;
 import org.apache.hbase.thirdparty.javax.ws.rs.POST;
 import org.apache.hbase.thirdparty.javax.ws.rs.Path;
+import org.apache.hbase.thirdparty.javax.ws.rs.PathParam;
 import org.apache.hbase.thirdparty.javax.ws.rs.Produces;
 import org.apache.hbase.thirdparty.javax.ws.rs.QueryParam;
 import org.apache.hbase.thirdparty.javax.ws.rs.core.Context;
@@ -40,13 +41,18 @@ import 
org.apache.uniffle.coordinator.web.request.CancelDecommissionRequest;
 import org.apache.uniffle.coordinator.web.request.DecommissionRequest;
 
 @Produces({MediaType.APPLICATION_JSON})
-public class ServerResource {
+public class ServerResource extends BaseResource {
   @Context protected ServletContext servletContext;
 
+  @GET
+  @Path("/nodes/{id}")
+  public Response<ServerNode> node(@PathParam("id") String id) {
+    return execute(() -> getClusterManager().getServerNodeById(id));
+  }
+
   @GET
   @Path("/nodes")
-  public Response<List<ServerNode>> nodes(
-      @QueryParam("id") String id, @QueryParam("status") String status) {
+  public Response<List<ServerNode>> nodes(@QueryParam("status") String status) 
{
     ClusterManager clusterManager = getClusterManager();
     List<ServerNode> serverList;
     if (ServerStatus.UNHEALTHY.name().equalsIgnoreCase(status)) {
@@ -60,9 +66,6 @@ public class ServerResource {
         serverList.stream()
             .filter(
                 server -> {
-                  if (id != null && !id.equals(server.getId())) {
-                    return false;
-                  }
                   if (status != null && 
!server.getStatus().toString().equals(status)) {
                     return false;
                   }
@@ -75,34 +78,47 @@ public class ServerResource {
 
   @POST
   @Path("/cancelDecommission")
-  @Produces({MediaType.APPLICATION_JSON})
   public Response<Object> cancelDecommission(CancelDecommissionRequest params) 
{
-    if (CollectionUtils.isEmpty(params.getServerIds())) {
-      return Response.fail("Parameter[serverIds] should not be null!");
-    }
-    ClusterManager clusterManager = getClusterManager();
-    try {
-      params.getServerIds().forEach(clusterManager::cancelDecommission);
-    } catch (Exception e) {
-      return Response.fail(e.getMessage());
-    }
-    return Response.success(null);
+    return execute(
+        () -> {
+          assert CollectionUtils.isNotEmpty(params.getServerIds())
+              : "Parameter[serverIds] should not be null!";
+          
params.getServerIds().forEach(getClusterManager()::cancelDecommission);
+          return null;
+        });
+  }
+
+  @POST
+  @Path("/{id}/cancelDecommission")
+  public Response<Object> cancelDecommission(@PathParam("id") String serverId) 
{
+    return execute(
+        () -> {
+          getClusterManager().cancelDecommission(serverId);
+          return null;
+        });
   }
 
   @POST
   @Path("/decommission")
-  @Produces({MediaType.APPLICATION_JSON})
   public Response<Object> decommission(DecommissionRequest params) {
-    if (CollectionUtils.isEmpty(params.getServerIds())) {
-      return Response.fail("Parameter[serverIds] should not be null!");
-    }
-    ClusterManager clusterManager = getClusterManager();
-    try {
-      params.getServerIds().forEach(clusterManager::decommission);
-    } catch (Exception e) {
-      return Response.fail(e.getMessage());
-    }
-    return Response.success(null);
+    return execute(
+        () -> {
+          assert CollectionUtils.isNotEmpty(params.getServerIds())
+              : "Parameter[serverIds] should not be null!";
+          params.getServerIds().forEach(getClusterManager()::decommission);
+          return null;
+        });
+  }
+
+  @POST
+  @Path("/{id}/decommission")
+  @Produces({MediaType.APPLICATION_JSON})
+  public Response<Object> decommission(@PathParam("id") String serverId) {
+    return execute(
+        () -> {
+          getClusterManager().decommission(serverId);
+          return null;
+        });
   }
 
   private ClusterManager getClusterManager() {
diff --git a/docs/coordinator_guide.md b/docs/coordinator_guide.md
index f33d6084..ca317076 100644
--- a/docs/coordinator_guide.md
+++ b/docs/coordinator_guide.md
@@ -128,9 +128,27 @@ AccessQuotaChecker is a checker when the number of 
concurrent tasks submitted by
 |rss.coordinator.quota.default.app.num|5|Default number of apps at user level.|
 
 
-## RESTful API(beta)
+## RESTful API
 
-### Fetch Shuffle servers
+### Fetch single shuffle server
+
+<details>
+ <summary><code>GET</code> <code><b>/api/server/nodes/{id}</b></code> 
</summary>
+
+##### Parameters
+
+> |name|type|data type|description|
+> |----|----|---------|-----------|
+> |id|required|string|shuffle server id, eg:127.0.0.1-19999|
+##### Example cURL
+
+> ```bash
+>  curl -X GET http://localhost:19998/api/server/nodes/127.0.0.1-19999
+> ```
+</details>
+
+
+### Fetch shuffle servers
 
 <details>
  <summary><code>GET</code> <code><b>/api/server/nodes</b></code> </summary>
@@ -139,13 +157,13 @@ AccessQuotaChecker is a checker when the number of 
concurrent tasks submitted by
 
 > |name|type|data type|description|
 > |----|----|---------|-----------|
-> |id|required|string|shuffle server id, eg:127.0.0.1:19999|
 > |status|optional|string|Shuffle server status, eg:ACTIVE, DECOMMISSIONING, 
 > DECOMMISSIONED|
 
 ##### Example cURL
 
 > ```bash
 >  curl -X GET http://localhost:19998/api/server/nodes
+>  curl -X GET http://localhost:19998/api/server/nodes?status=ACTIVE
 > ```
 </details>
 
@@ -158,7 +176,7 @@ AccessQuotaChecker is a checker when the number of 
concurrent tasks submitted by
 
 > |name|type| data type         |description|
 > |----|-------------------|---------|-----------|
-> |serverIds|required| array |Shuffle server array, eg:["127.0.0.1:19999"]|
+> |serverIds|required| array |Shuffle server array, eg:["127.0.0.1-19999"]|
 > 
 ##### Example cURL
 
@@ -168,6 +186,25 @@ AccessQuotaChecker is a checker when the number of 
concurrent tasks submitted by
 </details>
 
 
+### Decommission single shuffle server
+
+<details>
+ <summary><code>POST</code> <code><b>/api/server/{id}/decommission</b></code> 
</summary>
+
+##### Parameters
+
+> | name |type| data type | description                          |
+> 
|------|-------------------|-----------|--------------------------------------|
+> | id   |required| string    | Shuffle server id, eg:127.0.0.1-19999 |
+>
+##### Example cURL
+
+> ```bash
+>  curl -X POST -H "Content-Type: application/json" 
http://localhost:19998/api/server/127.0.0.1-19999/decommission
+> ```
+</details>
+
+
 ### Cancel decommission shuffle servers
 
 <details>
@@ -184,4 +221,23 @@ AccessQuotaChecker is a checker when the number of 
concurrent tasks submitted by
 > ```bash
 >  curl -X POST -H "Content-Type: application/json" 
 > http://localhost:19998/api/server/cancelDecommission  -d '{"serverIds:": 
 > ["127.0.0.1:19999"]}'
 > ```
+</details>
+
+
+### Cancel decommission single shuffle server
+
+<details>
+ <summary><code>POST</code> 
<code><b>/api/server/{id}/cancelDecommission</b></code> </summary>
+
+##### Parameters
+
+> |name|type| data type | description                             |
+> |----|-------------------|--------|-----------------------------------------|
+> |serverIds|required| string | Shuffle server id, eg:"127.0.0.1-19999" |
+>
+##### Example cURL
+
+> ```bash
+>  curl -X POST -H "Content-Type: application/json" 
http://localhost:19998/api/server/127.0.0.1-19999/cancelDecommission
+> ```
 </details>
\ No newline at end of file
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
index a4b6ffd1..abdd0673 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
@@ -59,11 +59,15 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class ServletTest extends IntegrationTestBase {
   private static final String URL_PREFIX = "http://127.0.0.1:12345/api/";;
+  private static final String SINGLE_NODE_URL = URL_PREFIX + "server/nodes/%s";
   private static final String NODES_URL = URL_PREFIX + "server/nodes";
   private static final String LOSTNODES_URL = URL_PREFIX + 
"server/nodes?status=LOST";
   private static final String UNHEALTHYNODES_URL = URL_PREFIX + 
"server/nodes?status=UNHEALTHY";
   private static final String DECOMMISSION_URL = URL_PREFIX + 
"server/decommission";
   private static final String CANCEL_DECOMMISSION_URL = URL_PREFIX + 
"server/cancelDecommission";
+  private static final String DECOMMISSION_SINGLENODE_URL = URL_PREFIX + 
"server/%s/decommission";
+  private static final String CANCEL_DECOMMISSION_SINGLENODE_URL =
+      URL_PREFIX + "server/%s/cancelDecommission";
   private static CoordinatorServer coordinatorServer;
   private ObjectMapper objectMapper = new ObjectMapper();
 
@@ -112,6 +116,18 @@ public class ServletTest extends IntegrationTestBase {
         .until(() -> coordinatorServer.getClusterManager().list().size() == 4);
   }
 
+  @Test
+  public void testGetSingleNode() throws Exception {
+    ShuffleServer shuffleServer = shuffleServers.get(0);
+    String content = TestUtils.httpGet(String.format(SINGLE_NODE_URL, 
shuffleServer.getId()));
+    Response<HashMap<String, Object>> response =
+        objectMapper.readValue(content, new 
TypeReference<Response<HashMap<String, Object>>>() {});
+    HashMap<String, Object> server = response.getData();
+    assertEquals(0, response.getCode());
+    assertEquals(SHUFFLE_SERVER_PORT, 
Integer.parseInt(server.get("grpcPort").toString()));
+    assertEquals(ServerStatus.ACTIVE.toString(), server.get("status"));
+  }
+
   @Test
   public void testNodesServlet() throws Exception {
     String content = TestUtils.httpGet(NODES_URL);
@@ -127,16 +143,6 @@ public class ServletTest extends IntegrationTestBase {
     assertEquals(
         SHUFFLE_SERVER_PORT + 1, 
Integer.parseInt(serverList.get(1).get("grpcPort").toString()));
     assertEquals(ServerStatus.ACTIVE.toString(), 
serverList.get(1).get("status"));
-
-    // Only fetch one server.
-    ShuffleServer shuffleServer = shuffleServers.get(0);
-    content = TestUtils.httpGet(NODES_URL + "?id=" + shuffleServer.getId());
-    response =
-        objectMapper.readValue(
-            content, new TypeReference<Response<List<HashMap<String, 
Object>>>>() {});
-    serverList = response.getData();
-    assertEquals(1, serverList.size());
-    assertEquals(shuffleServer.getId(), serverList.get(0).get("id"));
   }
 
   @Test
@@ -198,7 +204,7 @@ public class ServletTest extends IntegrationTestBase {
     String content =
         TestUtils.httpPost(
             CANCEL_DECOMMISSION_URL, 
objectMapper.writeValueAsString(decommissionRequest));
-    Response<Object> response = objectMapper.readValue(content, 
Response.class);
+    Response<?> response = objectMapper.readValue(content, Response.class);
     assertEquals(-1, response.getCode());
     assertNotNull(response.getErrMsg());
     CancelDecommissionRequest cancelDecommissionRequest = new 
CancelDecommissionRequest();
@@ -240,4 +246,49 @@ public class ServletTest extends IntegrationTestBase {
     assertEquals(0, response.getCode());
     assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());
   }
+
+  @Test
+  public void testDecommissionSingleNode() throws Exception {
+    ShuffleServer shuffleServer = shuffleServers.get(0);
+    assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());
+    String content =
+        TestUtils.httpPost(String.format(CANCEL_DECOMMISSION_SINGLENODE_URL, 
"not_exist_serverId"));
+    Response<?> response = objectMapper.readValue(content, Response.class);
+    assertEquals(-1, response.getCode());
+    assertNotNull(response.getErrMsg());
+    content =
+        TestUtils.httpPost(
+            String.format(CANCEL_DECOMMISSION_SINGLENODE_URL, 
shuffleServer.getId()));
+    response = objectMapper.readValue(content, Response.class);
+    assertEquals(0, response.getCode());
+
+    // Register shuffle, avoid server exiting immediately.
+    ShuffleServerGrpcClient shuffleServerClient =
+        new ShuffleServerGrpcClient(LOCALHOST, SHUFFLE_SERVER_PORT);
+    shuffleServerClient.registerShuffle(
+        new RssRegisterShuffleRequest(
+            "testDecommissionServlet_appId", 0, Lists.newArrayList(new 
PartitionRange(0, 1)), ""));
+    content = TestUtils.httpPost(String.format(DECOMMISSION_SINGLENODE_URL, 
shuffleServer.getId()));
+    response = objectMapper.readValue(content, Response.class);
+    assertEquals(0, response.getCode());
+    assertEquals(ServerStatus.DECOMMISSIONING, 
shuffleServer.getServerStatus());
+
+    // Wait until shuffle server send heartbeat to coordinator.
+    Awaitility.await()
+        .timeout(10, TimeUnit.SECONDS)
+        .until(
+            () ->
+                ServerStatus.DECOMMISSIONING.equals(
+                    coordinatorServer
+                        .getClusterManager()
+                        .getServerNodeById(shuffleServer.getId())
+                        .getStatus()));
+    // Cancel decommission.
+    content =
+        TestUtils.httpPost(
+            String.format(CANCEL_DECOMMISSION_SINGLENODE_URL, 
shuffleServer.getId()));
+    response = objectMapper.readValue(content, Response.class);
+    assertEquals(0, response.getCode());
+    assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());
+  }
 }

Reply via email to