This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new e17b24fd [#999] improvement: apply REST principles in the urls of the
http interfaces (#1000)
e17b24fd is described below
commit e17b24fd0a9e07c23aba60acb3d61089c1378842
Author: xianjingfeng <[email protected]>
AuthorDate: Mon Jul 17 15:15:23 2023 +0800
[#999] improvement: apply REST principles in the urls of the http
interfaces (#1000)
### What changes were proposed in this pull request?
Apply REST principles in the urls of the http interfaces
### Why are the changes needed?
Fix: #999
### Does this PR introduce _any_ user-facing change?
Env:
Server IP: 127.0.0.1
HTTP port: 19998
RPC port: 19999
#### Decommission single shuffle server example:
```
curl -XPOST -H "Content-type:application/json"
"http://127.0.0.1:19998/api/server/127.0.0.1-19999/decommission"
```
#### Cancel decommission single shuffle server example:
```
curl -XPOST -H "Content-type:application/json"
"http://127.0.0.1:19998/api/server/127.0.0.1-19999/cancelDecommission"
```
#### Fetch single shuffle server:
```
curl -X GET http://127.0.0.1:19998/api/server/127.0.0.1-19999
```
#### Fetch server list:
```
#path: /api/server/nodes/[{status}]
curl "http://127.0.0.1:19998/api/server/nodes"
curl "http://127.0.0.1:19998/api/server/nodes/DECOMMISSIONING"
curl "http://127.0.0.1:19998/api/server/nodes/ACTIVE"
```
### How was this patch tested?
UT
Co-authored-by: xianjingfeng <[email protected]>
---
.../apache/uniffle/common/metrics/TestUtils.java | 10 ++-
.../coordinator/web/resource/AdminResource.java | 19 +++---
.../coordinator/web/resource/BaseResource.java | 32 ++++++++++
.../coordinator/web/resource/ServerResource.java | 72 ++++++++++++---------
docs/coordinator_guide.md | 64 +++++++++++++++++--
.../java/org/apache/uniffle/test/ServletTest.java | 73 ++++++++++++++++++----
6 files changed, 216 insertions(+), 54 deletions(-)
diff --git
a/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java
b/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java
index f2511eac..c08d09e6 100644
--- a/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java
+++ b/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java
@@ -42,6 +42,10 @@ public class TestUtils {
return content.toString();
}
+ public static String httpPost(String urlString) throws IOException {
+ return httpPost(urlString, null);
+ }
+
public static String httpPost(String urlString, String postData) throws
IOException {
URL url = new URL(urlString);
HttpURLConnection con = (HttpURLConnection) url.openConnection();
@@ -49,8 +53,10 @@ public class TestUtils {
con.setRequestMethod("POST");
con.setRequestProperty("Content-type", "application/json");
StringBuilder content = new StringBuilder();
- try (OutputStream outputStream = con.getOutputStream(); ) {
- outputStream.write(postData.getBytes());
+ try (OutputStream outputStream = con.getOutputStream()) {
+ if (postData != null) {
+ outputStream.write(postData.getBytes());
+ }
try (BufferedReader in = new BufferedReader(new
InputStreamReader(con.getInputStream())); ) {
String inputLine;
while ((inputLine = in.readLine()) != null) {
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/AdminResource.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/AdminResource.java
index c235565a..d895c1df 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/AdminResource.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/AdminResource.java
@@ -19,7 +19,6 @@ package org.apache.uniffle.coordinator.web.resource;
import java.util.List;
import javax.servlet.ServletContext;
-import javax.servlet.http.HttpServletRequest;
import org.apache.hbase.thirdparty.javax.ws.rs.GET;
import org.apache.hbase.thirdparty.javax.ws.rs.Path;
@@ -35,20 +34,22 @@ import
org.apache.uniffle.coordinator.access.checker.AccessChecker;
import org.apache.uniffle.coordinator.web.Response;
@Produces({MediaType.APPLICATION_JSON})
-public class AdminResource {
+public class AdminResource extends BaseResource {
private static final Logger LOG =
LoggerFactory.getLogger(AdminResource.class);
- @Context private HttpServletRequest httpRequest;
@Context protected ServletContext servletContext;
@GET
@Path("/refreshChecker")
public Response<List<ServerNode>> refreshChecker() {
- List<AccessChecker> accessCheckers =
getAccessManager().getAccessCheckers();
- LOG.info(
- "The access checker {} has been refreshed, you can add the checker via
rss.coordinator.access.checkers.",
- accessCheckers);
- accessCheckers.forEach(AccessChecker::refreshAccessChecker);
- return Response.success(null);
+ return execute(
+ () -> {
+ List<AccessChecker> accessCheckers =
getAccessManager().getAccessCheckers();
+ LOG.info(
+ "The access checker {} has been refreshed, you can add the
checker via rss.coordinator.access.checkers.",
+ accessCheckers);
+ accessCheckers.forEach(AccessChecker::refreshAccessChecker);
+ return null;
+ });
}
private AccessManager getAccessManager() {
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/BaseResource.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/BaseResource.java
new file mode 100644
index 00000000..861f91ba
--- /dev/null
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/BaseResource.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator.web.resource;
+
+import java.util.concurrent.Callable;
+
+import org.apache.uniffle.coordinator.web.Response;
+
+public abstract class BaseResource {
+ protected <T> Response<T> execute(Callable<T> callable) {
+ try {
+ return Response.success(callable.call());
+ } catch (Throwable e) {
+ return Response.fail(e.getMessage());
+ }
+ }
+}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ServerResource.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ServerResource.java
index 6899b383..6d58e7b6 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ServerResource.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ServerResource.java
@@ -27,6 +27,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.hbase.thirdparty.javax.ws.rs.GET;
import org.apache.hbase.thirdparty.javax.ws.rs.POST;
import org.apache.hbase.thirdparty.javax.ws.rs.Path;
+import org.apache.hbase.thirdparty.javax.ws.rs.PathParam;
import org.apache.hbase.thirdparty.javax.ws.rs.Produces;
import org.apache.hbase.thirdparty.javax.ws.rs.QueryParam;
import org.apache.hbase.thirdparty.javax.ws.rs.core.Context;
@@ -40,13 +41,18 @@ import
org.apache.uniffle.coordinator.web.request.CancelDecommissionRequest;
import org.apache.uniffle.coordinator.web.request.DecommissionRequest;
@Produces({MediaType.APPLICATION_JSON})
-public class ServerResource {
+public class ServerResource extends BaseResource {
@Context protected ServletContext servletContext;
+ @GET
+ @Path("/nodes/{id}")
+ public Response<ServerNode> node(@PathParam("id") String id) {
+ return execute(() -> getClusterManager().getServerNodeById(id));
+ }
+
@GET
@Path("/nodes")
- public Response<List<ServerNode>> nodes(
- @QueryParam("id") String id, @QueryParam("status") String status) {
+ public Response<List<ServerNode>> nodes(@QueryParam("status") String status)
{
ClusterManager clusterManager = getClusterManager();
List<ServerNode> serverList;
if (ServerStatus.UNHEALTHY.name().equalsIgnoreCase(status)) {
@@ -60,9 +66,6 @@ public class ServerResource {
serverList.stream()
.filter(
server -> {
- if (id != null && !id.equals(server.getId())) {
- return false;
- }
if (status != null &&
!server.getStatus().toString().equals(status)) {
return false;
}
@@ -75,34 +78,47 @@ public class ServerResource {
@POST
@Path("/cancelDecommission")
- @Produces({MediaType.APPLICATION_JSON})
public Response<Object> cancelDecommission(CancelDecommissionRequest params)
{
- if (CollectionUtils.isEmpty(params.getServerIds())) {
- return Response.fail("Parameter[serverIds] should not be null!");
- }
- ClusterManager clusterManager = getClusterManager();
- try {
- params.getServerIds().forEach(clusterManager::cancelDecommission);
- } catch (Exception e) {
- return Response.fail(e.getMessage());
- }
- return Response.success(null);
+ return execute(
+ () -> {
+ assert CollectionUtils.isNotEmpty(params.getServerIds())
+ : "Parameter[serverIds] should not be null!";
+
params.getServerIds().forEach(getClusterManager()::cancelDecommission);
+ return null;
+ });
+ }
+
+ @POST
+ @Path("/{id}/cancelDecommission")
+ public Response<Object> cancelDecommission(@PathParam("id") String serverId)
{
+ return execute(
+ () -> {
+ getClusterManager().cancelDecommission(serverId);
+ return null;
+ });
}
@POST
@Path("/decommission")
- @Produces({MediaType.APPLICATION_JSON})
public Response<Object> decommission(DecommissionRequest params) {
- if (CollectionUtils.isEmpty(params.getServerIds())) {
- return Response.fail("Parameter[serverIds] should not be null!");
- }
- ClusterManager clusterManager = getClusterManager();
- try {
- params.getServerIds().forEach(clusterManager::decommission);
- } catch (Exception e) {
- return Response.fail(e.getMessage());
- }
- return Response.success(null);
+ return execute(
+ () -> {
+ assert CollectionUtils.isNotEmpty(params.getServerIds())
+ : "Parameter[serverIds] should not be null!";
+ params.getServerIds().forEach(getClusterManager()::decommission);
+ return null;
+ });
+ }
+
+ @POST
+ @Path("/{id}/decommission")
+ @Produces({MediaType.APPLICATION_JSON})
+ public Response<Object> decommission(@PathParam("id") String serverId) {
+ return execute(
+ () -> {
+ getClusterManager().decommission(serverId);
+ return null;
+ });
}
private ClusterManager getClusterManager() {
diff --git a/docs/coordinator_guide.md b/docs/coordinator_guide.md
index f33d6084..ca317076 100644
--- a/docs/coordinator_guide.md
+++ b/docs/coordinator_guide.md
@@ -128,9 +128,27 @@ AccessQuotaChecker is a checker when the number of
concurrent tasks submitted by
|rss.coordinator.quota.default.app.num|5|Default number of apps at user level.|
-## RESTful API(beta)
+## RESTful API
-### Fetch Shuffle servers
+### Fetch single shuffle server
+
+<details>
+ <summary><code>GET</code> <code><b>/api/server/nodes/{id}</b></code>
</summary>
+
+##### Parameters
+
+> |name|type|data type|description|
+> |----|----|---------|-----------|
+> |id|required|string|shuffle server id, eg:127.0.0.1-19999|
+##### Example cURL
+
+> ```bash
+> curl -X GET http://localhost:19998/api/server/nodes/127.0.0.1-19999
+> ```
+</details>
+
+
+### Fetch shuffle servers
<details>
<summary><code>GET</code> <code><b>/api/server/nodes</b></code> </summary>
@@ -139,13 +157,13 @@ AccessQuotaChecker is a checker when the number of
concurrent tasks submitted by
> |name|type|data type|description|
> |----|----|---------|-----------|
-> |id|required|string|shuffle server id, eg:127.0.0.1:19999|
> |status|optional|string|Shuffle server status, eg:ACTIVE, DECOMMISSIONING,
> DECOMMISSIONED|
##### Example cURL
> ```bash
> curl -X GET http://localhost:19998/api/server/nodes
+> curl -X GET http://localhost:19998/api/server/nodes?status=ACTIVE
> ```
</details>
@@ -158,7 +176,7 @@ AccessQuotaChecker is a checker when the number of
concurrent tasks submitted by
> |name|type| data type |description|
> |----|-------------------|---------|-----------|
-> |serverIds|required| array |Shuffle server array, eg:["127.0.0.1:19999"]|
+> |serverIds|required| array |Shuffle server array, eg:["127.0.0.1-19999"]|
>
##### Example cURL
@@ -168,6 +186,25 @@ AccessQuotaChecker is a checker when the number of
concurrent tasks submitted by
</details>
+### Decommission single shuffle server
+
+<details>
+ <summary><code>POST</code> <code><b>/api/server/{id}/decommission</b></code>
</summary>
+
+##### Parameters
+
+> | name |type| data type | description |
+>
|------|-------------------|-----------|--------------------------------------|
+> | id |required| string | Shuffle server id, eg:127.0.0.1-19999 |
+>
+##### Example cURL
+
+> ```bash
+> curl -X POST -H "Content-Type: application/json"
http://localhost:19998/api/server/127.0.0.1-19999/decommission
+> ```
+</details>
+
+
### Cancel decommission shuffle servers
<details>
@@ -184,4 +221,23 @@ AccessQuotaChecker is a checker when the number of
concurrent tasks submitted by
> ```bash
> curl -X POST -H "Content-Type: application/json"
> http://localhost:19998/api/server/cancelDecommission -d '{"serverIds:":
> ["127.0.0.1:19999"]}'
> ```
+</details>
+
+
+### Cancel decommission single shuffle server
+
+<details>
+ <summary><code>POST</code>
<code><b>/api/server/{id}/cancelDecommission</b></code> </summary>
+
+##### Parameters
+
+> |name|type| data type | description |
+> |----|-------------------|--------|-----------------------------------------|
+> |serverIds|required| string | Shuffle server id, eg:"127.0.0.1-19999" |
+>
+##### Example cURL
+
+> ```bash
+> curl -X POST -H "Content-Type: application/json"
http://localhost:19998/api/server/127.0.0.1-19999/cancelDecommission
+> ```
</details>
\ No newline at end of file
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
index a4b6ffd1..abdd0673 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
@@ -59,11 +59,15 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class ServletTest extends IntegrationTestBase {
private static final String URL_PREFIX = "http://127.0.0.1:12345/api/";
+ private static final String SINGLE_NODE_URL = URL_PREFIX + "server/nodes/%s";
private static final String NODES_URL = URL_PREFIX + "server/nodes";
private static final String LOSTNODES_URL = URL_PREFIX +
"server/nodes?status=LOST";
private static final String UNHEALTHYNODES_URL = URL_PREFIX +
"server/nodes?status=UNHEALTHY";
private static final String DECOMMISSION_URL = URL_PREFIX +
"server/decommission";
private static final String CANCEL_DECOMMISSION_URL = URL_PREFIX +
"server/cancelDecommission";
+ private static final String DECOMMISSION_SINGLENODE_URL = URL_PREFIX +
"server/%s/decommission";
+ private static final String CANCEL_DECOMMISSION_SINGLENODE_URL =
+ URL_PREFIX + "server/%s/cancelDecommission";
private static CoordinatorServer coordinatorServer;
private ObjectMapper objectMapper = new ObjectMapper();
@@ -112,6 +116,18 @@ public class ServletTest extends IntegrationTestBase {
.until(() -> coordinatorServer.getClusterManager().list().size() == 4);
}
+ @Test
+ public void testGetSingleNode() throws Exception {
+ ShuffleServer shuffleServer = shuffleServers.get(0);
+ String content = TestUtils.httpGet(String.format(SINGLE_NODE_URL,
shuffleServer.getId()));
+ Response<HashMap<String, Object>> response =
+ objectMapper.readValue(content, new
TypeReference<Response<HashMap<String, Object>>>() {});
+ HashMap<String, Object> server = response.getData();
+ assertEquals(0, response.getCode());
+ assertEquals(SHUFFLE_SERVER_PORT,
Integer.parseInt(server.get("grpcPort").toString()));
+ assertEquals(ServerStatus.ACTIVE.toString(), server.get("status"));
+ }
+
@Test
public void testNodesServlet() throws Exception {
String content = TestUtils.httpGet(NODES_URL);
@@ -127,16 +143,6 @@ public class ServletTest extends IntegrationTestBase {
assertEquals(
SHUFFLE_SERVER_PORT + 1,
Integer.parseInt(serverList.get(1).get("grpcPort").toString()));
assertEquals(ServerStatus.ACTIVE.toString(),
serverList.get(1).get("status"));
-
- // Only fetch one server.
- ShuffleServer shuffleServer = shuffleServers.get(0);
- content = TestUtils.httpGet(NODES_URL + "?id=" + shuffleServer.getId());
- response =
- objectMapper.readValue(
- content, new TypeReference<Response<List<HashMap<String,
Object>>>>() {});
- serverList = response.getData();
- assertEquals(1, serverList.size());
- assertEquals(shuffleServer.getId(), serverList.get(0).get("id"));
}
@Test
@@ -198,7 +204,7 @@ public class ServletTest extends IntegrationTestBase {
String content =
TestUtils.httpPost(
CANCEL_DECOMMISSION_URL,
objectMapper.writeValueAsString(decommissionRequest));
- Response<Object> response = objectMapper.readValue(content,
Response.class);
+ Response<?> response = objectMapper.readValue(content, Response.class);
assertEquals(-1, response.getCode());
assertNotNull(response.getErrMsg());
CancelDecommissionRequest cancelDecommissionRequest = new
CancelDecommissionRequest();
@@ -240,4 +246,49 @@ public class ServletTest extends IntegrationTestBase {
assertEquals(0, response.getCode());
assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());
}
+
+ @Test
+ public void testDecommissionSingleNode() throws Exception {
+ ShuffleServer shuffleServer = shuffleServers.get(0);
+ assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());
+ String content =
+ TestUtils.httpPost(String.format(CANCEL_DECOMMISSION_SINGLENODE_URL,
"not_exist_serverId"));
+ Response<?> response = objectMapper.readValue(content, Response.class);
+ assertEquals(-1, response.getCode());
+ assertNotNull(response.getErrMsg());
+ content =
+ TestUtils.httpPost(
+ String.format(CANCEL_DECOMMISSION_SINGLENODE_URL,
shuffleServer.getId()));
+ response = objectMapper.readValue(content, Response.class);
+ assertEquals(0, response.getCode());
+
+ // Register shuffle, avoid server exiting immediately.
+ ShuffleServerGrpcClient shuffleServerClient =
+ new ShuffleServerGrpcClient(LOCALHOST, SHUFFLE_SERVER_PORT);
+ shuffleServerClient.registerShuffle(
+ new RssRegisterShuffleRequest(
+ "testDecommissionServlet_appId", 0, Lists.newArrayList(new
PartitionRange(0, 1)), ""));
+ content = TestUtils.httpPost(String.format(DECOMMISSION_SINGLENODE_URL,
shuffleServer.getId()));
+ response = objectMapper.readValue(content, Response.class);
+ assertEquals(0, response.getCode());
+ assertEquals(ServerStatus.DECOMMISSIONING,
shuffleServer.getServerStatus());
+
+ // Wait until shuffle server send heartbeat to coordinator.
+ Awaitility.await()
+ .timeout(10, TimeUnit.SECONDS)
+ .until(
+ () ->
+ ServerStatus.DECOMMISSIONING.equals(
+ coordinatorServer
+ .getClusterManager()
+ .getServerNodeById(shuffleServer.getId())
+ .getStatus()));
+ // Cancel decommission.
+ content =
+ TestUtils.httpPost(
+ String.format(CANCEL_DECOMMISSION_SINGLENODE_URL,
shuffleServer.getId()));
+ response = objectMapper.readValue(content, Response.class);
+ assertEquals(0, response.getCode());
+ assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());
+ }
}