This is an automated email from the ASF dual-hosted git repository.
xianjin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 1406e1fc [#837] feat: Display information of all application through
Cli. (#964)
1406e1fc is described below
commit 1406e1fceeeedd87820033513da941964dbdbd74
Author: slfan1989 <[email protected]>
AuthorDate: Fri Aug 11 17:21:25 2023 +0800
[#837] feat: Display information of all application through Cli. (#964)
### What changes were proposed in this pull request?
> uniffle client-cli -host localhost -port 9526 --applications --app-list
application_1,application_2 --appId-regex application_1 -o JSON
```
sh-3.2# ./uniffle client-cli -host localhost -port 9526 --applications
--app-list application_1,application_2 --appId-regex application_1 -o JSON
2023-08-08 18:10:29,130 INFO uniffle.AbstractCustomCommandLine: connected
to coordinator: http://localhost:9526.
2023-08-08 18:10:29,517 INFO cli.UniffleCLI: uniffle-client-cli : get
applications
application:
[{"applicationId":"application_1","user":"test","lastHeartBeatTime":"2023-08-08
18:10:23"}]
```
> uniffle client-cli -host localhost -port 9526 --applications -o JSON
```
sh-3.2# ./uniffle client-cli -host localhost -port 9526 --applications -o
JSON
2023-08-08 18:15:06,737 INFO uniffle.AbstractCustomCommandLine: connected
to coordinator: http://localhost:9526.
2023-08-08 18:15:07,149 INFO cli.UniffleCLI: uniffle-client-cli : get
applications
application:
[{"applicationId":"application_0","user":"test","lastHeartBeatTime":"2023-08-08
18:14:51"},{"applicationId":"application_1","user":"test","lastHeartBeatTime":"2023-08-08
18:14:51"},{"applicationId":"application_10","user":"test","lastHeartBeatTime":"2023-08-08
18:14:51"},{"applicationId":"application_100","user":"test","lastHeartBeatTime":"2023-08-08
18:14:51"},{"applicationId":"application_1000","user":"test","lastHeartBeatTime":"2023-08-08
18:14:51"},{"applicationId":"a [...]
```
> sh-3.2# ./uniffle client-cli -host localhost -port 9526 --applications
```
sh-3.2# ./uniffle client-cli -host localhost -port 9526 --applications
2023-08-08 18:16:32,561 INFO uniffle.AbstractCustomCommandLine: connected
to coordinator: http://localhost:9526.
2023-08-08 18:16:33,066 INFO cli.UniffleCLI: uniffle-client-cli : get
applications
+-------------------------------------------------------------------+
| Uniffle Applications |
+------------------+------+---------------------+-------------------+
| ApplicationId | User | Last HeartBeatTime | RemoteStoragePath |
+------------------+------+---------------------+-------------------+
| application_0 | test | 2023-08-08 18:16:30 | null |
| application_1 | test | 2023-08-08 18:16:30 | null |
| application_10 | test | 2023-08-08 18:16:30 | null |
| application_100 | test | 2023-08-08 18:16:30 | null |
| application_1000 | test | 2023-08-08 18:16:30 | null |
| application_1001 | test | 2023-08-08 18:16:30 | null |
| application_1002 | test | 2023-08-08 18:16:30 | null |
| application_1003 | test | 2023-08-08 18:16:30 | null |
| application_1004 | test | 2023-08-08 18:16:30 | null |
| application_1005 | test | 2023-08-08 18:16:30 | null |
+------------------+------+---------------------+-------------------+
```
### Why are the changes needed?
fixes #837 [Subtask] Display information of all application through Cli.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Add Junit Test & Test environment verification
---
.../apache/uniffle/AbstractCustomCommandLine.java | 35 ++
.../apache/uniffle/UniffleCliArgsException.java | 5 +
.../java/org/apache/uniffle/api/AdminRestApi.java | 113 +++++
.../org/apache/uniffle/cli/UniffleAdminCLI.java | 40 +-
.../java/org/apache/uniffle/cli/UniffleCLI.java | 241 ++++++++++-
.../java/org/apache/uniffle/client/RestClient.java | 2 +
.../org/apache/uniffle/client/RestClientImpl.java | 25 +-
.../ApplicationResponse.java} | 36 +-
.../org/apache/uniffle/cli/UniffleTestCLI.java | 23 +-
.../org/apache/uniffle/common/Application.java | 154 +++++++
.../uniffle/coordinator/ApplicationManager.java | 126 ++++++
.../uniffle/coordinator/CoordinatorServer.java | 3 +-
.../web/request/ApplicationRequest.java | 85 ++++
.../coordinator/web/resource/ServerResource.java | 47 +++
.../uniffle/coordinator/GenericTestUtils.java | 66 +++
.../coordinator/web/CoordinatorTestServer.java | 62 +++
.../coordinator/web/UniffleJavaProcess.java | 68 +++
.../coordinator/web/UniffleServicesRESTTest.java | 457 +++++++++++++++++++++
18 files changed, 1529 insertions(+), 59 deletions(-)
diff --git
a/cli/src/main/java/org/apache/uniffle/AbstractCustomCommandLine.java
b/cli/src/main/java/org/apache/uniffle/AbstractCustomCommandLine.java
index 1e0a27ad..d51bb8a8 100644
--- a/cli/src/main/java/org/apache/uniffle/AbstractCustomCommandLine.java
+++ b/cli/src/main/java/org/apache/uniffle/AbstractCustomCommandLine.java
@@ -20,12 +20,26 @@ package org.apache.uniffle;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.client.UniffleRestClient;
public abstract class AbstractCustomCommandLine implements CustomCommandLine {
+ protected final Option coordinatorHost =
+ new Option("host", "coordinatorHost", true, "This is coordinator server
host.");
+ protected final Option coordinatorPort =
+ new Option("port", "coordinatorPort", true, "This is coordinator server
port.");
+ protected final Option ssl = new Option(null, "ssl", false, "use SSL");
+
+ protected UniffleRestClient client;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AbstractCustomCommandLine.class);
+
protected void printUsage() {
System.out.println("Usage:");
HelpFormatter formatter = new HelpFormatter();
@@ -78,4 +92,25 @@ public abstract class AbstractCustomCommandLine implements
CustomCommandLine {
addRunOptions(options);
return parse(options, args, stopAtNonOptions);
}
+
+ @Override
+ public void addGeneralOptions(Options baseOptions) {
+ baseOptions.addOption(coordinatorHost);
+ baseOptions.addOption(coordinatorPort);
+ baseOptions.addOption(ssl);
+ }
+
+ protected void getUniffleRestClient(CommandLine cmd) {
+ String host = cmd.getOptionValue(coordinatorHost.getOpt()).trim();
+ int port =
Integer.parseInt(cmd.getOptionValue(coordinatorPort.getOpt()).trim());
+ System.out.println("host:" + host + ",port:" + port);
+ String hostUrl;
+ if (cmd.hasOption(ssl.getLongOpt())) {
+ hostUrl = String.format("https://%s:%d", host, port);
+ } else {
+ hostUrl = String.format("http://%s:%d", host, port);
+ }
+ LOG.info("connected to coordinator: {}.", hostUrl);
+ client = UniffleRestClient.builder(hostUrl).build();
+ }
}
diff --git a/cli/src/main/java/org/apache/uniffle/UniffleCliArgsException.java
b/cli/src/main/java/org/apache/uniffle/UniffleCliArgsException.java
index edc1d643..fe1ca01f 100644
--- a/cli/src/main/java/org/apache/uniffle/UniffleCliArgsException.java
+++ b/cli/src/main/java/org/apache/uniffle/UniffleCliArgsException.java
@@ -17,6 +17,11 @@
package org.apache.uniffle;
+/**
+ * The "UniffleCliArgsException" is an exception that indicates an error or
issue related to
+ * command-line arguments in the Uniffle program. It is likely thrown when
there is a problem with
+ * parsing, validating, or processing the command-line arguments provided by
the user.
+ */
public class UniffleCliArgsException extends Exception {
private static final long serialVersionUID = 1L;
diff --git a/cli/src/main/java/org/apache/uniffle/api/AdminRestApi.java
b/cli/src/main/java/org/apache/uniffle/api/AdminRestApi.java
index 0f81757c..c750fff4 100644
--- a/cli/src/main/java/org/apache/uniffle/api/AdminRestApi.java
+++ b/cli/src/main/java/org/apache/uniffle/api/AdminRestApi.java
@@ -17,13 +17,30 @@
package org.apache.uniffle.api;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.uniffle.client.RestClient;
import org.apache.uniffle.client.UniffleRestClient;
+import org.apache.uniffle.common.Application;
+import org.apache.uniffle.entity.ApplicationResponse;
public class AdminRestApi {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AdminRestApi.class);
+
private UniffleRestClient client;
private AdminRestApi() {}
@@ -37,6 +54,102 @@ public class AdminRestApi {
return this.getClient().get("/api/admin/refreshChecker", params, null);
}
+ public List<Application> getApplications(
+ String applications,
+ String applicationIdRegex,
+ String pageSize,
+ String currentPage,
+ String heartBeatTimeRange)
+ throws JsonProcessingException {
+ List<Application> results = new ArrayList<>();
+ String postJson =
+ getApplicationsJson(
+ applications, applicationIdRegex, pageSize, currentPage,
heartBeatTimeRange);
+ if (StringUtils.isNotBlank(postJson)) {
+ ObjectMapper objectMapper = new ObjectMapper();
+ ApplicationResponse response =
+ objectMapper.readValue(postJson, new
TypeReference<ApplicationResponse>() {});
+ if (response != null && response.getData() != null) {
+ results.addAll(response.getData());
+ }
+ }
+ return results;
+ }
+
+ public String getApplicationsJson(
+ String applications,
+ String applicationIdRegex,
+ String pageSize,
+ String currentPage,
+ String heartBeatTimeRange) {
+ Map<String, Object> params = new HashMap<>();
+
+ if (StringUtils.isNotBlank(applications)) {
+ String[] applicationArrays = applications.split(",");
+ params.put("applications", applicationArrays);
+ }
+
+ if (StringUtils.isNotBlank(applicationIdRegex)) {
+ params.put("appIdRegex", applicationIdRegex);
+ }
+
+ if (StringUtils.isNotBlank(pageSize)) {
+ params.put("pageSize", Integer.valueOf(pageSize));
+ }
+
+ if (StringUtils.isNotBlank(currentPage)) {
+ params.put("currentPage", Integer.valueOf(currentPage));
+ }
+
+ if (StringUtils.isNotBlank(heartBeatTimeRange)) {
+ transform(heartBeatTimeRange, params);
+ }
+
+ return this.getClient().post("/api/server/applications", params, null);
+ }
+
+ private void transform(String heartBeatTimeRange, Map<String, Object>
params) {
+ String[] timeRange = heartBeatTimeRange.split(",");
+ String startTimeStr = null;
+ String endTimeStr = null;
+
+ if (timeRange.length == 2) {
+ startTimeStr = timeRange[0];
+ endTimeStr = timeRange[1];
+ } else if (timeRange.length == 1) {
+ startTimeStr = timeRange[0];
+ }
+
+ try {
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd
HH:mm:ss");
+ LocalDateTime startTime = null;
+ LocalDateTime endTime = null;
+
+ if (startTimeStr != null) {
+ startTime = LocalDateTime.parse(startTimeStr.trim(), formatter);
+ }
+
+ if (endTimeStr != null) {
+ endTime = LocalDateTime.parse(endTimeStr.trim(), formatter);
+ }
+
+ if (startTime != null && endTime != null) {
+ long startTimeMillis =
startTime.toInstant(ZoneOffset.UTC).toEpochMilli();
+ long endTimeMillis = endTime.toInstant(ZoneOffset.UTC).toEpochMilli();
+ params.put("heartBeatStartTime", startTimeMillis);
+ params.put("heartBeatEndTime", endTimeMillis);
+ } else if (startTime != null) {
+ long startTimeMillis =
startTime.toInstant(ZoneOffset.UTC).toEpochMilli();
+ params.put("heartBeatStartTime", startTimeMillis);
+ } else if (endTime != null) {
+ long endTimeMillis = endTime.toInstant(ZoneOffset.UTC).toEpochMilli();
+ params.put("heartBeatEndTime", endTimeMillis);
+ }
+ } catch (Exception e) {
+ LOG.error("transform heartBeatTimeRange error.", e);
+ }
+ }
+
private RestClient getClient() {
return this.client.getHttpClient();
}
diff --git a/cli/src/main/java/org/apache/uniffle/cli/UniffleAdminCLI.java
b/cli/src/main/java/org/apache/uniffle/cli/UniffleAdminCLI.java
index 8d9b3a61..2ef5963e 100644
--- a/cli/src/main/java/org/apache/uniffle/cli/UniffleAdminCLI.java
+++ b/cli/src/main/java/org/apache/uniffle/cli/UniffleAdminCLI.java
@@ -34,15 +34,11 @@ public class UniffleAdminCLI extends
AbstractCustomCommandLine {
private final Options allOptions;
private final Option refreshCheckerCli;
- private final Option coordinatorHost;
- private final Option coordinatorPort;
- private final Option ssl;
-
private final Option help;
- protected UniffleRestClient client;
public UniffleAdminCLI(String shortPrefix, String longPrefix) {
allOptions = new Options();
+
refreshCheckerCli =
new Option(
shortPrefix + "r",
@@ -52,24 +48,8 @@ public class UniffleAdminCLI extends
AbstractCustomCommandLine {
help =
new Option(
shortPrefix + "h", longPrefix + "help", false, "Help for the
Uniffle Admin CLI.");
- coordinatorHost =
- new Option(
- shortPrefix + "s",
- longPrefix + "coordinatorHost",
- true,
- "This is coordinator server host.");
- coordinatorPort =
- new Option(
- shortPrefix + "p",
- longPrefix + "coordinatorPort",
- true,
- "This is coordinator server port.");
- ssl = new Option(null, longPrefix + "ssl", false, "use SSL.");
allOptions.addOption(refreshCheckerCli);
- allOptions.addOption(coordinatorHost);
- allOptions.addOption(coordinatorPort);
- allOptions.addOption(ssl);
allOptions.addOption(help);
}
@@ -92,16 +72,7 @@ public class UniffleAdminCLI extends
AbstractCustomCommandLine {
}
if (cmd.hasOption(coordinatorHost.getOpt()) &&
cmd.hasOption(coordinatorPort.getOpt())) {
- String host = cmd.getOptionValue(coordinatorHost.getOpt()).trim();
- int port =
Integer.parseInt(cmd.getOptionValue(coordinatorPort.getOpt()).trim());
- String hostUrl;
- if (cmd.hasOption(ssl.getOpt())) {
- hostUrl = String.format("https://%s:%d", host, port);
- } else {
- hostUrl = String.format("http://%s:%d", host, port);
- }
- LOG.info("connected to coordinator: {}.", hostUrl);
- client = UniffleRestClient.builder(hostUrl).build();
+ getUniffleRestClient(cmd);
}
if (cmd.hasOption(refreshCheckerCli.getOpt())) {
@@ -123,10 +94,9 @@ public class UniffleAdminCLI extends
AbstractCustomCommandLine {
@Override
public void addRunOptions(Options baseOptions) {
- baseOptions.addOption(refreshCheckerCli);
- baseOptions.addOption(coordinatorHost);
- baseOptions.addOption(coordinatorPort);
- baseOptions.addOption(ssl);
+ for (Object option : allOptions.getOptions()) {
+ baseOptions.addOption((Option) option);
+ }
}
@Override
diff --git a/cli/src/main/java/org/apache/uniffle/cli/UniffleCLI.java
b/cli/src/main/java/org/apache/uniffle/cli/UniffleCLI.java
index f2bc1f4b..7c7c66fb 100644
--- a/cli/src/main/java/org/apache/uniffle/cli/UniffleCLI.java
+++ b/cli/src/main/java/org/apache/uniffle/cli/UniffleCLI.java
@@ -17,25 +17,53 @@
package org.apache.uniffle.cli;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.gson.Gson;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.AbstractCustomCommandLine;
import org.apache.uniffle.UniffleCliArgsException;
+import org.apache.uniffle.api.AdminRestApi;
+import org.apache.uniffle.common.Application;
public class UniffleCLI extends AbstractCustomCommandLine {
private static final Logger LOG = LoggerFactory.getLogger(UniffleCLI.class);
+ private static final int EXIT_SUCCESS = 0;
+ private static final int EXIT_ERROR = 1;
private final Options allOptions;
private final Option uniffleClientCli;
private final Option uniffleAdminCli;
+ private final Option uniffleApplicationCli;
+ private final Option uniffleApplicationRegex;
+ private final Option uniffleApplicationListCli;
+ private final Option uniffleApplicationPageSize;
+ private final Option uniffleApplicationCurrentPage;
+ private final Option uniffleApplicationHbTimeRange;
+ private final Option uniffleOutFormat;
+ private final Option uniffleOutPutFile;
+ // private final Option uniffleLimit;
private final Option help;
+ private static final List<String> APPLICATIONS_HEADER =
+ Arrays.asList("ApplicationId", "User", "Last HeartBeatTime",
"RemoteStoragePath");
+
public UniffleCLI(String shortPrefix, String longPrefix) {
allOptions = new Options();
+
uniffleClientCli =
new Option(
shortPrefix + "c",
@@ -48,15 +76,65 @@ public class UniffleCLI extends AbstractCustomCommandLine {
longPrefix + "admin",
true,
"This is an admin command that will print args.");
+ uniffleApplicationCli =
+ new Option(
+ shortPrefix + "app",
+ longPrefix + "applications",
+ false,
+ "The command will be used to print a list of applications.");
+ uniffleApplicationListCli =
+ new Option(
+ null, longPrefix + "app-list", true, "We can provide an
application query list.");
+ uniffleApplicationRegex =
+ new Option(
+ null, longPrefix + "appId-regex", true, "ApplicationId Regex
filter expression.");
+ uniffleApplicationPageSize =
+ new Option(null, longPrefix + "app-pageSize", true, "Application
pagination page number");
+ uniffleApplicationCurrentPage =
+ new Option(
+ null, longPrefix + "app-currentPage", true, "Application
pagination current page");
+ uniffleApplicationHbTimeRange =
+ new Option(
+ null, longPrefix + "app-heartbeatTimeRange", true, "Application
Heartbeat TimeRange");
+ uniffleOutFormat =
+ new Option(
+ shortPrefix + "o",
+ longPrefix + "output-format",
+ true,
+ "We can use the -o|--output-format json option to output
application information to json."
+ + "We currently only support output in Json format");
+ uniffleOutPutFile =
+ new Option(
+ shortPrefix + "f",
+ longPrefix + "file",
+ true,
+ "We can use the -f|--file to output information to file");
help = new Option(shortPrefix + "h", longPrefix + "help", false, "Help for
the Uniffle CLI.");
+
allOptions.addOption(uniffleClientCli);
allOptions.addOption(uniffleAdminCli);
+ allOptions.addOption(uniffleApplicationCli);
+ allOptions.addOption(uniffleApplicationListCli);
+ allOptions.addOption(uniffleApplicationRegex);
+ allOptions.addOption(uniffleApplicationPageSize);
+ allOptions.addOption(uniffleApplicationCurrentPage);
+ allOptions.addOption(coordinatorHost);
+ allOptions.addOption(coordinatorPort);
+ allOptions.addOption(uniffleOutFormat);
+ allOptions.addOption(uniffleOutPutFile);
+ allOptions.addOption(uniffleApplicationHbTimeRange);
+ allOptions.addOption(ssl);
allOptions.addOption(help);
}
- public int run(String[] args) throws UniffleCliArgsException {
+ public int run(String[] args) throws UniffleCliArgsException,
JsonProcessingException {
final CommandLine cmd = parseCommandLineOptions(args, true);
+ if (args.length < 1) {
+ printUsage();
+ return EXIT_ERROR;
+ }
+
if (cmd.hasOption(help.getOpt())) {
printUsage();
return 0;
@@ -65,29 +143,180 @@ public class UniffleCLI extends AbstractCustomCommandLine
{
if (cmd.hasOption(uniffleClientCli.getOpt())) {
String cliArgs = cmd.getOptionValue(uniffleClientCli.getOpt());
System.out.println("uniffle-client-cli : " + cliArgs);
- return 0;
+ return EXIT_SUCCESS;
}
if (cmd.hasOption(uniffleAdminCli.getOpt())) {
String cliArgs = cmd.getOptionValue(uniffleAdminCli.getOpt());
System.out.println("uniffle-admin-cli : " + cliArgs);
- return 0;
+ return EXIT_SUCCESS;
}
- return 1;
+ if (cmd.hasOption(coordinatorHost.getOpt()) &&
cmd.hasOption(coordinatorPort.getOpt())) {
+ getUniffleRestClient(cmd);
+
+ // If we use application-cli
+ if (cmd.hasOption(uniffleApplicationCli.getLongOpt())) {
+ LOG.info("uniffle-client-cli : get applications");
+
+ // If we want to output json file
+ if (cmd.hasOption(uniffleOutFormat.getOpt())) {
+
+ // Get the OutFormat.
+ String outPutFormat =
cmd.getOptionValue(uniffleOutFormat.getOpt()).trim();
+ if (StringUtils.isBlank(outPutFormat)) {
+ System.out.println("output format is not null.");
+ return EXIT_ERROR;
+ }
+
+ // We allow users to enter json\Json\JSON etc.
+ // If the user enters another format, we will prompt the user that
we only support JSON.
+ if (!StringUtils.equalsAnyIgnoreCase(outPutFormat, "json")) {
+ System.out.println("The output currently supports only JSON
format.");
+ return EXIT_ERROR;
+ }
+
+ String json = getApplicationsJson(cmd);
+ if (StringUtils.isBlank(json)) {
+ System.out.println("no output result.");
+ return EXIT_ERROR;
+ }
+ System.out.println("application: " + json);
+
+ if (cmd.hasOption(uniffleOutPutFile.getOpt())) {
+
+ // Get output file location.
+ String uniffleOutPutFile =
cmd.getOptionValue(uniffleOutFormat.getOpt()).trim();
+ if (StringUtils.isBlank(uniffleOutPutFile)) {
+ System.out.println("The output file cannot be empty.");
+ return EXIT_ERROR;
+ }
+
+ try (FileOutputStream fos = new
FileOutputStream(uniffleOutPutFile);
+ OutputStreamWriter osw = new OutputStreamWriter(fos,
StandardCharsets.UTF_8)) {
+ osw.write(json);
+ System.out.println(
+ "applications json data has been written to the file("
+ + uniffleOutPutFile
+ + ").");
+ } catch (IOException e) {
+ System.out.println(
+ "An error occurred while writing the applications json data
to the file("
+ + uniffleOutPutFile
+ + ").");
+ return EXIT_ERROR;
+ }
+ }
+
+ return EXIT_SUCCESS;
+ }
+
+ try (PrintWriter writer =
+ new PrintWriter(new OutputStreamWriter(System.out,
StandardCharsets.UTF_8))) {
+ CLIContentUtils formattingCLIUtils =
+ new CLIContentUtils("Uniffle
Applications").addHeaders(APPLICATIONS_HEADER);
+ List<Application> applications = getApplications(cmd);
+ if (applications != null) {
+ applications.forEach(
+ app ->
+ formattingCLIUtils.addLine(
+ app.getApplicationId(),
+ app.getUser(),
+ app.getLastHeartBeatTime(),
+ app.getRemoteStoragePath()));
+ }
+ writer.print(formattingCLIUtils.render());
+ writer.flush();
+ return EXIT_SUCCESS;
+ }
+ }
+ }
+
+ return EXIT_ERROR;
}
@Override
public void addRunOptions(Options baseOptions) {
- baseOptions.addOption(uniffleClientCli);
- baseOptions.addOption(uniffleAdminCli);
+ for (Object option : allOptions.getOptions()) {
+ baseOptions.addOption((Option) option);
+ }
}
@Override
public void addGeneralOptions(Options baseOptions) {
+ super.addGeneralOptions(baseOptions);
baseOptions.addOption(help);
}
+ /**
+ * Get application list.
+ *
+ * @param cmd command.
+ * @return application list.
+ * @throws UniffleCliArgsException an exception that indicates an error or
issue related to
+ * command-line arguments in the Uniffle program.
+ * @throws JsonProcessingException Intermediate base class for all problems
encountered when
+ * processing (parsing, generating) JSON content that are not pure I/O
problems.
+ */
+ private List<Application> getApplications(CommandLine cmd)
+ throws UniffleCliArgsException, JsonProcessingException {
+ if (client == null) {
+ throw new UniffleCliArgsException(
+ "Missing Coordinator host address and grpc port parameters.");
+ }
+
+ // Condition 1: uniffleApplicationListCli
+ String applications = null;
+ if (cmd.hasOption(uniffleApplicationListCli.getLongOpt())) {
+ applications =
cmd.getOptionValue(uniffleApplicationListCli.getLongOpt()).trim();
+ }
+
+ // Condition 2: uniffleApplicationRegex
+ String applicationIdRegex = null;
+ if (cmd.hasOption(uniffleApplicationRegex.getLongOpt())) {
+ applicationIdRegex =
cmd.getOptionValue(uniffleApplicationRegex.getLongOpt()).trim();
+ }
+
+ // Condition 3: pageSize
+ String pageSize = null;
+ if (cmd.hasOption(uniffleApplicationPageSize.getLongOpt())) {
+ pageSize =
cmd.getOptionValue(uniffleApplicationPageSize.getLongOpt()).trim();
+ }
+
+ // Condition 4: currentPage
+ String currentPage = null;
+ if (cmd.hasOption(uniffleApplicationCurrentPage.getLongOpt())) {
+ currentPage =
cmd.getOptionValue(uniffleApplicationCurrentPage.getLongOpt()).trim();
+ }
+
+ // Condition 5: heartBeatStartTime
+ String heartBeatTimeRange = null;
+ if (cmd.hasOption(uniffleApplicationHbTimeRange.getLongOpt())) {
+ heartBeatTimeRange =
cmd.getOptionValue(uniffleApplicationHbTimeRange.getLongOpt()).trim();
+ }
+
+ AdminRestApi adminRestApi = new AdminRestApi(client);
+ return adminRestApi.getApplications(
+ applications, applicationIdRegex, pageSize, currentPage,
heartBeatTimeRange);
+ }
+
+ /**
+ * Get application Json.
+ *
+ * @param cmd command.
+ * @return application Json.
+ * @throws UniffleCliArgsException an exception that indicates an error or
issue related to
+ * command-line arguments in the Uniffle program.
+ * @throws JsonProcessingException Intermediate base class for all problems
encountered when
+ * processing (parsing, generating) JSON content that are not pure I/O
problems.
+ */
+ private String getApplicationsJson(CommandLine cmd)
+ throws UniffleCliArgsException, JsonProcessingException {
+ List<Application> applications = getApplications(cmd);
+ Gson gson = new Gson();
+ return gson.toJson(applications);
+ }
+
public static void main(String[] args) {
int retCode;
try {
diff --git a/cli/src/main/java/org/apache/uniffle/client/RestClient.java
b/cli/src/main/java/org/apache/uniffle/client/RestClient.java
index 617362a5..84bbe021 100644
--- a/cli/src/main/java/org/apache/uniffle/client/RestClient.java
+++ b/cli/src/main/java/org/apache/uniffle/client/RestClient.java
@@ -23,4 +23,6 @@ import java.util.Map;
public interface RestClient extends AutoCloseable, Cloneable {
String get(String path, Map<String, Object> params, String authHeader);
+
+ String post(String path, Map<String, Object> params, String authHeader);
}
diff --git a/cli/src/main/java/org/apache/uniffle/client/RestClientImpl.java
b/cli/src/main/java/org/apache/uniffle/client/RestClientImpl.java
index 95730e08..81f17ad2 100644
--- a/cli/src/main/java/org/apache/uniffle/client/RestClientImpl.java
+++ b/cli/src/main/java/org/apache/uniffle/client/RestClientImpl.java
@@ -17,11 +17,15 @@
package org.apache.uniffle.client;
+import java.io.UnsupportedEncodingException;
import java.net.ConnectException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
@@ -33,6 +37,7 @@ import org.apache.http.client.methods.RequestBuilder;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
@@ -44,6 +49,8 @@ public class RestClientImpl implements RestClient {
private static final Logger LOG =
LoggerFactory.getLogger(RestClientImpl.class);
private CloseableHttpClient httpclient;
private String baseUrl;
+ private final ObjectMapper mapper =
+ new
ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
public RestClientImpl(String baseUrl, CloseableHttpClient httpclient) {
this.httpclient = httpclient;
@@ -62,6 +69,22 @@ public class RestClientImpl implements RestClient {
return doRequest(buildURI(path, params), authHeader, RequestBuilder.get());
}
+ @Override
+ public String post(String path, Map<String, Object> params, String
authHeader) {
+ RequestBuilder post = RequestBuilder.post();
+ String requestBody = "";
+ try {
+ requestBody = mapper.writeValueAsString(params);
+ StringEntity requestEntity = new StringEntity(requestBody);
+ post.setEntity(requestEntity);
+ } catch (JsonProcessingException e) {
+ LOG.error("params{} to json error.", params, e);
+ } catch (UnsupportedEncodingException e) {
+ LOG.error("params{} to StringEntity error.", params, e);
+ }
+ return doRequest(buildURI(path, null), authHeader, post);
+ }
+
private String doRequest(URI uri, String authHeader, RequestBuilder
requestBuilder) {
String response;
try {
@@ -108,7 +131,7 @@ public class RestClientImpl implements RestClient {
String url = StringUtils.isNotBlank(path) ? this.baseUrl + "/" + path :
this.baseUrl;
URIBuilder builder = new URIBuilder(url);
- if (!params.isEmpty()) {
+ if (params != null && !params.isEmpty()) {
for (Map.Entry<String, Object> entry : params.entrySet()) {
if (entry.getValue() != null) {
builder.addParameter(entry.getKey(), entry.getValue().toString());
diff --git a/cli/src/main/java/org/apache/uniffle/UniffleCliArgsException.java
b/cli/src/main/java/org/apache/uniffle/entity/ApplicationResponse.java
similarity index 58%
copy from cli/src/main/java/org/apache/uniffle/UniffleCliArgsException.java
copy to cli/src/main/java/org/apache/uniffle/entity/ApplicationResponse.java
index edc1d643..bafb11bb 100644
--- a/cli/src/main/java/org/apache/uniffle/UniffleCliArgsException.java
+++ b/cli/src/main/java/org/apache/uniffle/entity/ApplicationResponse.java
@@ -15,16 +15,38 @@
* limitations under the License.
*/
-package org.apache.uniffle;
+package org.apache.uniffle.entity;
-public class UniffleCliArgsException extends Exception {
- private static final long serialVersionUID = 1L;
+import java.util.List;
- public UniffleCliArgsException(String message) {
- super(message);
+import org.apache.uniffle.common.Application;
+
+public class ApplicationResponse {
+ private List<Application> data;
+ private int code;
+ private String errMsg;
+
+ public List<Application> getData() {
+ return data;
+ }
+
+ public void setData(List<Application> data) {
+ this.data = data;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public void setCode(int code) {
+ this.code = code;
+ }
+
+ public String getErrMsg() {
+ return errMsg;
}
- public UniffleCliArgsException(String message, Throwable cause) {
- super(message, cause);
+ public void setErrMsg(String errMsg) {
+ this.errMsg = errMsg;
}
}
diff --git a/cli/src/test/java/org/apache/uniffle/cli/UniffleTestCLI.java
b/cli/src/test/java/org/apache/uniffle/cli/UniffleTestCLI.java
index 6fdaf947..8c80a2e7 100644
--- a/cli/src/test/java/org/apache/uniffle/cli/UniffleTestCLI.java
+++ b/cli/src/test/java/org/apache/uniffle/cli/UniffleTestCLI.java
@@ -50,15 +50,20 @@ public class UniffleTestCLI {
String[] args1 = {"-help"};
assertEquals(0, uniffleCLI.run(args1));
oldOutPrintStream.println(dataOut);
- assertTrue(
- dataOut
- .toString()
- .contains("-a,--admin <arg> This is an admin command that will
print args."));
- assertTrue(
- dataOut
- .toString()
- .contains("-c,--cli <arg> This is an client cli command that
will print args."));
- assertTrue(dataOut.toString().contains("-h,--help Help for the
Uniffle CLI."));
+
+ String cmdHelpMsg = dataOut.toString();
+ assertTrue(cmdHelpMsg.contains("-a,--admin <arg>"));
+ assertTrue(cmdHelpMsg.contains("This is an admin command that will print
args."));
+ assertTrue(dataOut.toString().contains("-c,--cli <arg>"));
+ assertTrue(dataOut.toString().contains("This is an client cli command that
will print args."));
+ assertTrue(dataOut.toString().contains("-h,--help"));
+ assertTrue(dataOut.toString().contains("Help for the Uniffle CLI."));
+ assertTrue(dataOut.toString().contains("-host,--coordinatorHost <arg>"));
+ assertTrue(dataOut.toString().contains("This is coordinator server
host."));
+ assertTrue(dataOut.toString().contains("-port,--coordinatorPort <arg>"));
+ assertTrue(dataOut.toString().contains("This is coordinator server
port."));
+ assertTrue(dataOut.toString().contains("--ssl"));
+ assertTrue(dataOut.toString().contains("use SSL"));
System.setOut(oldOutPrintStream);
System.setErr(oldErrPrintStream);
diff --git a/common/src/main/java/org/apache/uniffle/common/Application.java
b/common/src/main/java/org/apache/uniffle/common/Application.java
new file mode 100644
index 00000000..68814422
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/Application.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.time.DateFormatUtils;
+
+public class Application implements Comparable<Application> {
+
+ private static final String DATE_PATTERN = "yyyy-MM-dd HH:mm:ss";
+ private String applicationId;
+ private String user;
+ private String lastHeartBeatTime;
+ private String remoteStoragePath;
+
+ public Application() {}
+
+ public Application(Builder builder) {
+ this.applicationId = builder.applicationId;
+ this.user = builder.user;
+ this.lastHeartBeatTime = builder.lastHeartBeatTime;
+ this.remoteStoragePath = builder.remoteStoragePath;
+ }
+
+ public static class Builder {
+ private String applicationId;
+ private String user;
+ private String lastHeartBeatTime;
+ private String remoteStoragePath;
+
+ public Builder() {}
+
+ public Builder applicationId(String applicationId) {
+ this.applicationId = applicationId;
+ return this;
+ }
+
+ public Builder user(String user) {
+ this.user = user;
+ return this;
+ }
+
+ public Builder lastHeartBeatTime(long lastHeartBeatTime) {
+ this.lastHeartBeatTime = DateFormatUtils.format(lastHeartBeatTime,
DATE_PATTERN);
+ return this;
+ }
+
+ public Builder remoteStoragePath(RemoteStorageInfo remoteStorageInfo) {
+ if (remoteStorageInfo != null) {
+ this.remoteStoragePath = remoteStorageInfo.getPath();
+ }
+ return this;
+ }
+
+ public Application build() {
+ return new Application(this);
+ }
+ }
+
+ public String getApplicationId() {
+ return applicationId;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public String getLastHeartBeatTime() {
+ return lastHeartBeatTime;
+ }
+
+ public String getRemoteStoragePath() {
+ return remoteStoragePath;
+ }
+
+ public void setApplicationId(String applicationId) {
+ this.applicationId = applicationId;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public void setLastHeartBeatTime(String lastHeartBeatTime) {
+ this.lastHeartBeatTime = lastHeartBeatTime;
+ }
+
+ public void setRemoteStoragePath(String remoteStoragePath) {
+ this.remoteStoragePath = remoteStoragePath;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof Application)) {
+ return false;
+ }
+ Application otherImpl = this.getClass().cast(other);
+ return new EqualsBuilder()
+ .append(this.getApplicationId(), otherImpl.getApplicationId())
+ .append(this.getUser(), otherImpl.getUser())
+ .append(this.getLastHeartBeatTime(), otherImpl.getLastHeartBeatTime())
+ .append(this.getRemoteStoragePath(), otherImpl.getRemoteStoragePath())
+ .isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder()
+ .append(this.getApplicationId())
+ .append(this.getUser())
+ .append(this.getLastHeartBeatTime())
+ .append(this.getRemoteStoragePath())
+ .toHashCode();
+ }
+
+ @Override
+ public int compareTo(Application other) {
+ return this.applicationId.compareTo(other.applicationId);
+ }
+
+ @Override
+ public String toString() {
+ return "Application{"
+ + "applicationId='"
+ + applicationId
+ + '\''
+ + ", user='"
+ + user
+ + '\''
+ + ", lastHeartBeatTime='"
+ + lastHeartBeatTime
+ + '\''
+ + ", remoteStoragePath='"
+ + remoteStoragePath
+ + '\''
+ + '}';
+ }
+}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
index c2d77230..62642523 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
@@ -20,21 +20,26 @@ package org.apache.uniffle.coordinator;
import java.io.Closeable;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import org.apache.commons.lang3.Range;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.Application;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.JavaUtils;
@@ -365,6 +370,127 @@ public class ApplicationManager implements Closeable {
return storageHost;
}
+ /**
+ * Get Applications, The list contains applicationId, user,
lastHeartBeatTime, remoteStoragePath.
+ *
+ * <p>We have set 6 criteria for filtering applicationId, and these criteria
are in an 'AND'
+ * relationship. All the criteria must be met for the applicationId to be
selected.
+ *
+ * @param appIds Application List.
+ * @param pageSize Items per page.
+ * @param currentPage The number of pages to be queried.
+ * @param pHeartBeatStartTime heartbeat start time.
+ * @param pHeartBeatEndTime heartbeat end time.
+ * @param appIdRegex applicationId regular expression.
+ * @return Applications.
+ */
+ public List<Application> getApplications(
+ Set<String> appIds,
+ int pageSize,
+ int currentPage,
+ String pHeartBeatStartTime,
+ String pHeartBeatEndTime,
+ String appIdRegex) {
+ List<Application> applications = new ArrayList<>();
+ for (Map.Entry<String, Map<String, Long>> entry :
currentUserAndApp.entrySet()) {
+ String user = entry.getKey();
+ Map<String, Long> apps = entry.getValue();
+ apps.forEach(
+ (appId, heartBeatTime) -> {
+ // Filter condition 1: Check whether applicationId is included in
the filter list.
+ boolean match = appIds.size() == 0 || appIds.contains(appId);
+
+ // Filter condition 2: whether it meets the applicationId rule.
+ if (StringUtils.isNotBlank(appIdRegex) && match) {
+ match = match && matchApplicationId(appId, appIdRegex);
+ }
+
+ // Filter condition 3: Determine whether the start and
+ // end of the heartbeat time are in line with expectations.
+ if (StringUtils.isNotBlank(pHeartBeatStartTime)
+ || StringUtils.isNotBlank(pHeartBeatEndTime)) {
+ match =
+ matchHeartBeatStartTimeAndEndTime(
+ pHeartBeatStartTime, pHeartBeatEndTime, heartBeatTime);
+ }
+
+ // If it meets expectations, add to the list to be returned.
+ if (match) {
+ RemoteStorageInfo remoteStorageInfo =
+ appIdToRemoteStorageInfo.getOrDefault(appId, null);
+ Application application =
+ new Application.Builder()
+ .applicationId(appId)
+ .user(user)
+ .lastHeartBeatTime(heartBeatTime)
+ .remoteStoragePath(remoteStorageInfo)
+ .build();
+ applications.add(application);
+ }
+ });
+ }
+ Collections.sort(applications);
+ int startIndex = (currentPage - 1) * pageSize;
+ int endIndex = Math.min(startIndex + pageSize, applications.size());
+
+ LOG.info("getApplications >> appIds = {}.", applications);
+ return applications.subList(startIndex, endIndex);
+ }
+
+ /**
+ * Based on the regular expression, determine if the applicationId matches
the regular expression.
+ *
+ * @param applicationId applicationId
+ * @param regex regular expression pattern.
+ * @return If it returns true, it means the regular expression successfully
matches the
+ * applicationId; otherwise, the regular expression fails to match the
applicationId.
+ */
+ private boolean matchApplicationId(String applicationId, String regex) {
+ Pattern pattern = Pattern.compile(regex);
+ return pattern.matcher(applicationId).matches();
+ }
+
+ /**
+ * Filter heartbeat time based on query conditions.
+ *
+ * @param pStartTime heartbeat start time.
+ * @param pEndTime heartbeat end time.
+ * @param appHeartBeatTime application HeartBeatTime
+ * @return Returns true if the heartbeat time is within the given query
range, otherwise returns
+ * false.
+ */
+ private boolean matchHeartBeatStartTimeAndEndTime(
+ String pStartTime, String pEndTime, long appHeartBeatTime) {
+ long startTime = 0;
+ long endTime = Long.MAX_VALUE;
+
+ if (StringUtils.isNotBlank(pStartTime)) {
+ startTime = parseLongValue(pStartTime, "heartBeatStartTime");
+ }
+
+ if (StringUtils.isNotBlank(pEndTime)) {
+ endTime = parseLongValue(pEndTime, "heartBeatEndTime");
+ }
+
+ Range<Long> heartBeatTime = Range.between(startTime, endTime);
+ return heartBeatTime.contains(appHeartBeatTime);
+ }
+
+ /**
+ * Convert String to Long.
+ *
+ * @param strValue String Value
+ * @param fieldName Field Name
+ * @return Long value.
+ */
+ private long parseLongValue(String strValue, String fieldName) {
+ try {
+ return Long.parseLong(strValue);
+ } catch (NumberFormatException e) {
+ throw new NumberFormatException(fieldName + " value must be a number!");
+ }
+ }
+
public Map<String, Integer> getDefaultUserApps() {
return quotaManager.getDefaultUserApps();
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
index 33333dd8..30373500 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
@@ -192,6 +192,7 @@ public class CoordinatorServer extends ReconfigurableBase {
"org.apache.uniffle.coordinator.web.resource",
"org.apache.uniffle.common.web.resource");
jettyServer.registerInstance(ClusterManager.class, clusterManager);
jettyServer.registerInstance(AccessManager.class, accessManager);
+ jettyServer.registerInstance(ApplicationManager.class, applicationManager);
jettyServer.registerInstance(
CollectorRegistry.class.getCanonicalName() + "#server",
CoordinatorMetrics.getCollectorRegistry());
@@ -254,7 +255,7 @@ public class CoordinatorServer extends ReconfigurableBase {
}
/** Await termination on the main thread since the grpc library uses daemon
threads. */
- private void blockUntilShutdown() throws InterruptedException {
+ protected void blockUntilShutdown() throws InterruptedException {
server.blockUntilShutdown();
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/request/ApplicationRequest.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/request/ApplicationRequest.java
new file mode 100644
index 00000000..d3d75add
--- /dev/null
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/request/ApplicationRequest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator.web.request;
+
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+
+/** This class is mainly used for Application requests. */
+public class ApplicationRequest {
+ private Set<String> applications;
+ private int pageSize = 10;
+ private int currentPage = 1;
+ private String heartBeatStartTime;
+ private String heartBeatEndTime;
+ private String appIdRegex;
+
+ public Set<String> getApplications() {
+ return applications;
+ }
+
+ public void setApplications(Set<String> applications) {
+ this.applications = applications;
+ }
+
+ public int getPageSize() {
+ return pageSize;
+ }
+
+ public void setPageSize(int pageSize) {
+ this.pageSize = pageSize;
+ }
+
+ public int getCurrentPage() {
+ return currentPage;
+ }
+
+ public void setCurrentPage(int currentPage) {
+ this.currentPage = currentPage;
+ }
+
+ public String getHeartBeatStartTime() {
+ return heartBeatStartTime;
+ }
+
+ public void setHeartBeatStartTime(String heartBeatStartTime) {
+ this.heartBeatStartTime = heartBeatStartTime;
+ }
+
+ public String getHeartBeatEndTime() {
+ return heartBeatEndTime;
+ }
+
+ public void setHeartBeatEndTime(String heartBeatEndTime) {
+ this.heartBeatEndTime = heartBeatEndTime;
+ }
+
+ public String getAppIdRegex() {
+ return appIdRegex;
+ }
+
+ public void setAppIdRegex(String appIdRegex) {
+ this.appIdRegex = appIdRegex;
+ }
+
+ @Override
+ public String toString() {
+ return "ApplicationRequest{" + "applications=" +
StringUtils.join(applications, ",") + '}';
+ }
+}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ServerResource.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ServerResource.java
index 6d58e7b6..e3e21bec 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ServerResource.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ServerResource.java
@@ -19,7 +19,9 @@ package org.apache.uniffle.coordinator.web.resource;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.stream.Collectors;
import javax.servlet.ServletContext;
@@ -33,10 +35,13 @@ import org.apache.hbase.thirdparty.javax.ws.rs.QueryParam;
import org.apache.hbase.thirdparty.javax.ws.rs.core.Context;
import org.apache.hbase.thirdparty.javax.ws.rs.core.MediaType;
+import org.apache.uniffle.common.Application;
import org.apache.uniffle.common.ServerStatus;
+import org.apache.uniffle.coordinator.ApplicationManager;
import org.apache.uniffle.coordinator.ClusterManager;
import org.apache.uniffle.coordinator.ServerNode;
import org.apache.uniffle.coordinator.web.Response;
+import org.apache.uniffle.coordinator.web.request.ApplicationRequest;
import org.apache.uniffle.coordinator.web.request.CancelDecommissionRequest;
import org.apache.uniffle.coordinator.web.request.DecommissionRequest;
@@ -44,6 +49,12 @@ import
org.apache.uniffle.coordinator.web.request.DecommissionRequest;
public class ServerResource extends BaseResource {
@Context protected ServletContext servletContext;
+ @GET
+ @Path("/status")
+ public Response<String> status() {
+ return execute(() -> "success");
+ }
+
@GET
@Path("/nodes/{id}")
public Response<ServerNode> node(@PathParam("id") String id) {
@@ -121,7 +132,43 @@ public class ServerResource extends BaseResource {
});
}
+ @POST
+ @Path("/applications")
+ @Produces({MediaType.APPLICATION_JSON})
+ public Response<Object> application(ApplicationRequest params) {
+
+ if (params == null) {
+ return Response.fail("ApplicationRequest Is not null");
+ }
+
+ Set<String> filterApplications = new HashSet<>();
+ if (CollectionUtils.isNotEmpty(params.getApplications())) {
+ filterApplications = params.getApplications();
+ }
+
+ int currentPage = params.getCurrentPage();
+ int pageSize = params.getPageSize();
+ String startTime = params.getHeartBeatStartTime();
+ String endTime = params.getHeartBeatEndTime();
+ String appIdRegex = params.getAppIdRegex();
+
+ try {
+ ApplicationManager applicationManager = getApplicationManager();
+ List<Application> applicationSet =
+ applicationManager.getApplications(
+ filterApplications, pageSize, currentPage, startTime, endTime,
appIdRegex);
+ return Response.success(applicationSet);
+ } catch (Exception e) {
+ return Response.fail(e.getMessage());
+ }
+ }
+
private ClusterManager getClusterManager() {
return (ClusterManager)
servletContext.getAttribute(ClusterManager.class.getCanonicalName());
}
+
+ private ApplicationManager getApplicationManager() {
+ return (ApplicationManager)
+
servletContext.getAttribute(ApplicationManager.class.getCanonicalName());
+ }
}
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/GenericTestUtils.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/GenericTestUtils.java
new file mode 100644
index 00000000..26e6eb9a
--- /dev/null
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/GenericTestUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.util.Objects;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.util.Time;
+
+public class GenericTestUtils {
+
+ public static final String ERROR_MISSING_ARGUMENT =
+ "Input supplier interface should be initailized";
+ public static final String ERROR_INVALID_ARGUMENT =
+ "Total wait time should be greater than check interval time";
+
+ public static void waitFor(
+ final Supplier<Boolean> check, final long checkEveryMillis, final long
waitForMillis)
+ throws TimeoutException, InterruptedException {
+ waitFor(check, checkEveryMillis, waitForMillis, null);
+ }
+
+ public static void waitFor(
+ final Supplier<Boolean> check,
+ final long checkEveryMillis,
+ final long waitForMillis,
+ final String errorMsg)
+ throws TimeoutException, InterruptedException {
+ Objects.requireNonNull(check, ERROR_MISSING_ARGUMENT);
+ if (waitForMillis < checkEveryMillis) {
+ throw new IllegalArgumentException(ERROR_INVALID_ARGUMENT);
+ }
+
+ long st = Time.monotonicNow();
+ boolean result = check.get();
+
+ while (!result && (Time.monotonicNow() - st < waitForMillis)) {
+ Thread.sleep(checkEveryMillis);
+ result = check.get();
+ }
+
+ if (!result) {
+ final String exceptionErrorMsg =
+ "Timed out waiting for condition. "
+ + (StringUtils.isNotEmpty(errorMsg) ? "Error Message: " +
errorMsg : "");
+ throw new TimeoutException(exceptionErrorMsg);
+ }
+ }
+}
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/web/CoordinatorTestServer.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/web/CoordinatorTestServer.java
new file mode 100644
index 00000000..1b99ec80
--- /dev/null
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/web/CoordinatorTestServer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator.web;
+
+import picocli.CommandLine;
+
+import org.apache.uniffle.common.Arguments;
+import org.apache.uniffle.common.config.ReconfigurableBase;
+import org.apache.uniffle.coordinator.ApplicationManager;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.CoordinatorServer;
+
+/**
+ * This class is mainly used to simulate CoordinatorServer. When initializing
Coordinator, we will
+ * simulate and initialize some other properties, such as Applications.
+ */
+public class CoordinatorTestServer extends CoordinatorServer {
+ public CoordinatorTestServer(CoordinatorConf coordinatorConf) throws
Exception {
+ super(coordinatorConf);
+ generateApplicationList(2000);
+ }
+
+ private void generateApplicationList(int count) {
+ for (int i = 0; i < count; i++) {
+ ApplicationManager applicationManager = getApplicationManager();
+ applicationManager.registerApplicationInfo("application_" + i, "test");
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ Arguments arguments = new Arguments();
+ CommandLine commandLine = new CommandLine(arguments);
+ commandLine.parseArgs(args);
+ String configFile = arguments.getConfigFile();
+
+ // Load configuration from config files
+ final CoordinatorConf coordinatorConf = new CoordinatorConf(configFile);
+
+ coordinatorConf.setString(ReconfigurableBase.RECONFIGURABLE_FILE_NAME,
configFile);
+
+ // Start the coordinator service
+ final CoordinatorTestServer coordinatorServer = new
CoordinatorTestServer(coordinatorConf);
+
+ coordinatorServer.start();
+ coordinatorServer.blockUntilShutdown();
+ }
+}
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/web/UniffleJavaProcess.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/web/UniffleJavaProcess.java
new file mode 100644
index 00000000..53df37d9
--- /dev/null
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/web/UniffleJavaProcess.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator.web;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Simulate Java process execution for testing purposes. This method can truly
simulate the
+ * coordinator's rest interface access.
+ */
+public class UniffleJavaProcess {
+
+ private Process process;
+ private static final String JAVA_HOME = "java.home";
+ private static final String JAVA_CLASS_PATH = "java.class.path";
+
+ public UniffleJavaProcess(Class<?> clazz, File output) throws IOException,
InterruptedException {
+ this(clazz, null, output);
+ }
+
+ public UniffleJavaProcess(Class<?> clazz, List<String> addClassPaths, File
output)
+ throws IOException, InterruptedException {
+ final String javaHome = System.getProperty(JAVA_HOME);
+ final String javaBin = javaHome + File.separator + "bin" + File.separator
+ "java";
+
+ String classpath = System.getProperty(JAVA_CLASS_PATH);
+ classpath = classpath.concat("./src/test/resources");
+ if (addClassPaths != null) {
+ for (String addClasspath : addClassPaths) {
+ classpath = classpath.concat(File.pathSeparatorChar + addClasspath);
+ }
+ }
+ String className = clazz.getCanonicalName();
+ System.out.println("className:" + className);
+ String coordinator = "./src/test/resources/coordinator.conf";
+ ProcessBuilder builder =
+ new ProcessBuilder(javaBin, "-cp", classpath, className, "--conf",
coordinator);
+ builder.redirectInput(ProcessBuilder.Redirect.INHERIT);
+ builder.redirectOutput(output);
+ builder.redirectError(output);
+ process = builder.start();
+ }
+
+ public void stop() throws InterruptedException {
+ if (process != null) {
+ process.destroy();
+ process.waitFor();
+ process.exitValue();
+ }
+ }
+}
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/web/UniffleServicesRESTTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/web/UniffleServicesRESTTest.java
new file mode 100644
index 00000000..95e40332
--- /dev/null
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/web/UniffleServicesRESTTest.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator.web;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.lang.reflect.Type;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.Application;
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.GenericTestUtils;
+import org.apache.uniffle.coordinator.web.request.ApplicationRequest;
+
+import static org.apache.uniffle.coordinator.web.Response.fail;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class UniffleServicesRESTTest {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(UniffleServicesRESTTest.class);
+ public static final String SYSPROP_TEST_DATA_DIR = "test.build.data";
+ public static final String DEFAULT_TEST_DATA_DIR =
+ "target" + File.separator + "test" + File.separator + "data";
+ public static final String COORDINATOR_CONF_PATH =
"./src/test/resources/coordinator.conf";
+ public static final String APPLICATIONS = "/api/server/applications";
+ private static UniffleJavaProcess coordinatorServer;
+ private static String coordinatorAddress;
+ private static CoordinatorConf conf;
+
+ @BeforeAll
+ public static void setUp() throws Exception {
+ conf = new CoordinatorConf(COORDINATOR_CONF_PATH);
+
+ File baseDir = getTestDir("processes");
+ baseDir.mkdirs();
+ String baseName = UniffleServicesRESTTest.class.getSimpleName();
+ coordinatorAddress = getCoordinatorWebAppURLWithScheme(conf);
+
+ File coordinatorOutput = new File(baseDir, baseName + "-coordinator.log");
+ coordinatorOutput.createNewFile();
+ coordinatorServer = new UniffleJavaProcess(CoordinatorTestServer.class,
coordinatorOutput);
+ waitUniffleCoordinatorWebRunning(coordinatorAddress, "/api/server/status");
+ }
+
+ @AfterAll
+ public static void shutdown() throws Exception {
+ if (coordinatorServer != null) {
+ coordinatorServer.stop();
+ }
+ }
+
+ public static File getTestDir(String subdir) {
+ return new File(getTestDir(), subdir).getAbsoluteFile();
+ }
+
+ public static File getTestDir() {
+ String prop = System.getProperty(SYSPROP_TEST_DATA_DIR,
DEFAULT_TEST_DATA_DIR);
+ if (prop.isEmpty()) {
+ prop = DEFAULT_TEST_DATA_DIR;
+ }
+ File dir = new File(prop).getAbsoluteFile();
+ dir.mkdirs();
+ assertExists(dir);
+ return dir;
+ }
+
+ /**
+ * Wait for Coordinator Web to start.
+ *
+ * @param address Coordinator WebAddress.
+ * @param path Accessed URL path.
+ */
+ private static void waitUniffleCoordinatorWebRunning(final String address,
final String path) {
+ try {
+ GenericTestUtils.waitFor(
+ () -> {
+ try {
+ String response = sendGET(address + path);
+ if (StringUtils.isNotBlank(response)) {
+ Gson gson = new Gson();
+ Type type = new TypeToken<Map<String, Object>>() {}.getType();
+ Map<String, Object> map = gson.fromJson(response, type);
+ String data = (String) map.getOrDefault("data", "failed");
+ if (data.equals("success")) {
+ // process is up and running
+ return true;
+ }
+ }
+ } catch (Exception e) {
+ // process is not up and running
+ }
+ return false;
+ },
+ 1000,
+ 20 * 1000);
+ } catch (Exception e) {
+ fail("Web app not running");
+ }
+ }
+
+ public static void assertExists(File f) {
+ assertTrue(f.exists(), "File " + f + " should exist");
+ }
+
+ /**
+ * Provide the URL to get the return result of the GET request.
+ *
+ * @param getURL url.
+ * @return The result returned by the request.
+ * @throws IOException an I/O exception of some sort has occurred.
+ */
+ private static String sendGET(String getURL) throws IOException {
+
+ URL obj = new URL(getURL);
+ HttpURLConnection con = (HttpURLConnection) obj.openConnection();
+ con.setRequestMethod("GET");
+ con.setRequestProperty("User-Agent", "User-Agent");
+ int responseCode = con.getResponseCode();
+ LOG.info("GET Response Code : {}", responseCode);
+
+ // success
+ if (responseCode == HttpURLConnection.HTTP_OK) {
+ BufferedReader in = new BufferedReader(new
InputStreamReader(con.getInputStream()));
+ String inputLine;
+ StringBuffer response = new StringBuffer();
+
+ while ((inputLine = in.readLine()) != null) {
+ response.append(inputLine);
+ }
+ in.close();
+
+ // return result
+ return response.toString();
+ } else {
+ LOG.warn("GET request did not work.");
+ }
+ return "";
+ }
+
+ /**
+ * @param postURL
+ * @param postParams
+ * @return
+ * @throws IOException
+ */
+ private static String sendPOST(String postURL, String postParams) throws
IOException {
+ URL obj = new URL(postURL);
+ HttpURLConnection con = (HttpURLConnection) obj.openConnection();
+ con.setRequestMethod("POST");
+ con.setRequestProperty("User-Agent", "User-Agent");
+ con.setRequestProperty("Content-Type", "application/json");
+
+ con.setDoOutput(true);
+ OutputStream os = con.getOutputStream();
+ os.write(postParams.getBytes());
+ os.flush();
+ os.close();
+
+ int responseCode = con.getResponseCode();
+ LOG.info("POST Response Code : {}", responseCode);
+
+ if (responseCode == HttpURLConnection.HTTP_OK) {
+ BufferedReader in = new BufferedReader(new
InputStreamReader(con.getInputStream()));
+ String inputLine;
+ StringBuffer response = new StringBuffer();
+
+ while ((inputLine = in.readLine()) != null) {
+ response.append(inputLine);
+ }
+ in.close();
+
+ // return result
+ return response.toString();
+ } else {
+ LOG.warn("POST request did not work.");
+ System.out.println("POST request did not work.");
+ }
+ return "";
+ }
+
+ public static String getCoordinatorWebAppURLWithScheme(CoordinatorConf conf)
{
+ return "http://localhost:" + conf.getInteger(RssBaseConf.JETTY_HTTP_PORT);
+ }
+
+ class ApplicationDatas {
+ private int code;
+ private List<Application> data;
+ private String errMsg;
+
+ public int getCode() {
+ return code;
+ }
+
+ public void setCode(int code) {
+ this.code = code;
+ }
+
+ public List<Application> getData() {
+ return data;
+ }
+
+ public void setData(List<Application> data) {
+ this.data = data;
+ }
+
+ public String getErrMsg() {
+ return errMsg;
+ }
+
+ public void setErrMsg(String errMsg) {
+ this.errMsg = errMsg;
+ }
+ }
+
+ @Test
+ public void testGetApplications() throws Exception {
+ // In this test case,
+ // we tested the filter condition 1: applications collection,
+ // we added 3 application_ids to the collection,
+ // and the Coordinator will return the information of these 3 applications.
+
+ final ApplicationRequest request = new ApplicationRequest();
+
+ Set<String> applications = new HashSet<>();
+ applications.add("application_1");
+ applications.add("application_2");
+ applications.add("application_3");
+ request.setApplications(applications);
+
+ Gson gson = new Gson();
+ String params = gson.toJson(request);
+
+ String response = sendPOST(coordinatorAddress + APPLICATIONS, params);
+ assertNotNull(response);
+
+ ApplicationDatas dataModel = gson.fromJson(response,
ApplicationDatas.class);
+ assertNotNull(dataModel);
+
+ List<Application> datas = dataModel.getData();
+ assertNotNull(datas);
+ assertEquals(3, datas.size());
+
+ for (Application application : datas) {
+ assertTrue(applications.contains(application.getApplicationId()));
+ }
+ }
+
+ @Test
+ public void testGetApplicationsWithNoFilter() throws Exception {
+ // In this test case, we did not set any filter conditions,
+ // we will get 10 records from the coordinator
+
+ final ApplicationRequest request = new ApplicationRequest();
+
+ Gson gson = new Gson();
+ String params = gson.toJson(request);
+
+ String response = sendPOST(coordinatorAddress + APPLICATIONS, params);
+ assertNotNull(response);
+
+ ApplicationDatas dataModel = gson.fromJson(response,
ApplicationDatas.class);
+ assertNotNull(dataModel);
+
+ List<Application> datas = dataModel.getData();
+ assertNotNull(datas);
+ assertEquals(10, datas.size());
+
+ // We sort the result set, we should get the following application:
+ // application_[0,1,10,100,1000,1001,1002,1003,1004,1005]
+ Set<String> applications = new HashSet<>();
+ applications.add("application_0");
+ applications.add("application_1");
+ applications.add("application_10");
+ applications.add("application_100");
+ applications.add("application_1000");
+ applications.add("application_1001");
+ applications.add("application_1002");
+ applications.add("application_1003");
+ applications.add("application_1004");
+ applications.add("application_1005");
+
+ for (Application application : datas) {
+ assertTrue(applications.contains(application.getApplicationId()));
+ }
+ }
+
+ @Test
+ public void testGetApplicationsWithAppRegex() throws Exception {
+
+ // In this test case, we will test the functionality of regular expression
matching.
+ // We want to match application_id that contains 1000.
+ final ApplicationRequest request = new ApplicationRequest();
+ request.setAppIdRegex(".*1000.*");
+
+ Gson gson = new Gson();
+ String params = gson.toJson(request);
+
+ String response = sendPOST(coordinatorAddress + APPLICATIONS, params);
+ assertNotNull(response);
+
+ ApplicationDatas dataModel = gson.fromJson(response,
ApplicationDatas.class);
+ assertNotNull(dataModel);
+
+ List<Application> data = dataModel.getData();
+ assertNotNull(data);
+ assertEquals(1, data.size());
+
+ Application application = data.get(0);
+ assertNotNull(application);
+ assertEquals("application_1000", application.getApplicationId());
+ assertEquals("test", application.getUser());
+ }
+
+ @Test
+ public void testGetApplicationsPage() throws Exception {
+
+ // In this test case, we apply to read the records on page 2,
+ // and set up to return 20 records per page.
+ final ApplicationRequest request = new ApplicationRequest();
+ request.setCurrentPage(2);
+ request.setPageSize(20);
+
+ Gson gson = new Gson();
+ String params = gson.toJson(request);
+
+ String response = sendPOST(coordinatorAddress + APPLICATIONS, params);
+ assertNotNull(response);
+
+ ApplicationDatas dataModel = gson.fromJson(response,
ApplicationDatas.class);
+ assertNotNull(dataModel);
+
+ List<Application> datas = dataModel.getData();
+ assertNotNull(datas);
+ assertEquals(20, datas.size());
+
+ // 5 records. application_[1015,1016,1017,1018,1019]
+ // 11 records.
application_[102,1020,1021,1022,1023,1024,1025,1026,1027,1028,1029]
+ // 4 records. application_[103,1030,1031,1032]
+ Set<String> applications = new HashSet<>();
+ applications.add("application_1015");
+ applications.add("application_1016");
+ applications.add("application_1017");
+ applications.add("application_1018");
+ applications.add("application_1019");
+ applications.add("application_102");
+ applications.add("application_1020");
+ applications.add("application_1021");
+ applications.add("application_1022");
+ applications.add("application_1023");
+ applications.add("application_1024");
+ applications.add("application_1025");
+ applications.add("application_1026");
+ applications.add("application_1027");
+ applications.add("application_1028");
+ applications.add("application_1029");
+ applications.add("application_103");
+ applications.add("application_1030");
+ applications.add("application_1031");
+ applications.add("application_1032");
+
+ for (Application application : datas) {
+ assertTrue(applications.contains(application.getApplicationId()));
+ }
+ }
+
+ @Test
+ public void testGetApplicationsWithNull() throws Exception {
+ // In this test case, we apply to read the records on page 2,
+ // and set up to return 20 records per page.
+ final ApplicationRequest request = null;
+ Gson gson = new Gson();
+ String params = gson.toJson(request);
+
+ String response = sendPOST(coordinatorAddress + APPLICATIONS, params);
+ assertNotNull(response);
+
+ ApplicationDatas dataModel = gson.fromJson(response,
ApplicationDatas.class);
+ assertNotNull(dataModel);
+ assertEquals(-1, dataModel.code);
+ assertEquals("ApplicationRequest Is not null", dataModel.errMsg);
+ }
+
+ @Test
+ public void testGetApplicationsWithStartTimeAndEndTime() throws Exception {
+
+ // In this test case, we set two groups of heartBeatStartTime and
heartBeatEndTime respectively.
+ // We expect no data to be obtained in the first group,
+ // and we expect to obtain 10 data in the second group.
+ long startTime = new Date().getTime();
+ long endTime = new Date().getTime() + 100;
+
+ final ApplicationRequest request = new ApplicationRequest();
+ request.setHeartBeatStartTime(String.valueOf(startTime));
+ request.setHeartBeatEndTime(String.valueOf(endTime));
+
+ Gson gson = new Gson();
+ String params = gson.toJson(request);
+ String response = sendPOST(coordinatorAddress + APPLICATIONS, params);
+ assertNotNull(response);
+
+ ApplicationDatas dataModel = gson.fromJson(response,
ApplicationDatas.class);
+ assertNotNull(dataModel);
+ List<Application> datas = dataModel.getData();
+ assertNotNull(datas);
+ assertEquals(0, datas.size());
+
+ startTime = 0;
+ final ApplicationRequest request2 = new ApplicationRequest();
+ request2.setHeartBeatStartTime(String.valueOf(startTime));
+ request2.setHeartBeatEndTime(String.valueOf(endTime));
+
+ String params2 = gson.toJson(request2);
+ String response2 = sendPOST(coordinatorAddress + APPLICATIONS, params2);
+ assertNotNull(response2);
+ ApplicationDatas dataModel2 = gson.fromJson(response2,
ApplicationDatas.class);
+ assertNotNull(dataModel2);
+ List<Application> datas2 = dataModel2.getData();
+
+ assertEquals(10, datas2.size());
+ }
+}