Hisoka-X commented on code in PR #5950: URL: https://github.com/apache/seatunnel/pull/5950#discussion_r1418212785
##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java:
##########
@@ -83,242 +92,323 @@ void beforeClass() throws Exception {
@Test
public void testGetRunningJobById() {
- given().get(
- HOST
- + hazelcastInstance
- .getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- + RestConstant.RUNNING_JOB_URL
- + "/"
- + clientJobProxy.getJobId())
- .then()
- .statusCode(200)
- .body("jobName", equalTo("fake_to_file"))
- .body("jobStatus", equalTo("RUNNING"));
+ Arrays.asList(node2, node1)
+ .forEach(
+ hazelcastInstance2 -> {
Review Comment:
```suggestion
instance -> {
```
##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java:
##########
@@ -83,242 +92,323 @@ void beforeClass() throws Exception {
@Test
public void testGetRunningJobById() {
- given().get(
- HOST
- + hazelcastInstance
- .getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- + RestConstant.RUNNING_JOB_URL
- + "/"
- + clientJobProxy.getJobId())
- .then()
- .statusCode(200)
- .body("jobName", equalTo("fake_to_file"))
- .body("jobStatus", equalTo("RUNNING"));
+ Arrays.asList(node2, node1)
+ .forEach(
+ hazelcastInstance2 -> {
+ given().get(
+ HOST
+ + hazelcastInstance2
+ .getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ +
RestConstant.RUNNING_JOB_URL
+ + "/"
+ +
clientJobProxy.getJobId())
+ .then()
+ .statusCode(200)
+ .body("jobName", equalTo("fake_to_file"))
+ .body("jobStatus", equalTo("RUNNING"));
+ });
}
@Test
public void testGetRunningJobs() {
- given().get(
- HOST
- + hazelcastInstance
- .getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- + RestConstant.RUNNING_JOBS_URL)
- .then()
- .statusCode(200)
- .body("[0].jobName", equalTo("fake_to_file"))
- .body("[0].jobStatus", equalTo("RUNNING"));
+ Arrays.asList(node2, node1)
+ .forEach(
+ hazelcastInstance2 -> {
Review Comment:
ditto
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java:
##########
@@ -92,7 +95,25 @@ public void handle(HttpPostCommand httpPostCommand) {
private SeaTunnelServer getSeaTunnelServer() {
Map<String, Object> extensionServices =
this.textCommandService.getNode().getNodeExtension().createExtensionServices();
- return (SeaTunnelServer)
extensionServices.get(Constant.SEATUNNEL_SERVICE_NAME);
+ SeaTunnelServer seaTunnelServer =
+ (SeaTunnelServer)
extensionServices.get(Constant.SEATUNNEL_SERVICE_NAME);
+ if (!seaTunnelServer.isMasterNode()) {
+ for (HazelcastInstance hazelcastInstance :
Hazelcast.getAllHazelcastInstances()) {
+ SeaTunnelServer seaTunnelServer1 =
Review Comment:
ditto
##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java:
##########
@@ -83,242 +92,323 @@ void beforeClass() throws Exception {
@Test
public void testGetRunningJobById() {
- given().get(
- HOST
- + hazelcastInstance
- .getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- + RestConstant.RUNNING_JOB_URL
- + "/"
- + clientJobProxy.getJobId())
- .then()
- .statusCode(200)
- .body("jobName", equalTo("fake_to_file"))
- .body("jobStatus", equalTo("RUNNING"));
+ Arrays.asList(node2, node1)
+ .forEach(
+ hazelcastInstance2 -> {
+ given().get(
+ HOST
+ + hazelcastInstance2
+ .getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ +
RestConstant.RUNNING_JOB_URL
+ + "/"
+ +
clientJobProxy.getJobId())
+ .then()
+ .statusCode(200)
+ .body("jobName", equalTo("fake_to_file"))
+ .body("jobStatus", equalTo("RUNNING"));
+ });
}
@Test
public void testGetRunningJobs() {
- given().get(
- HOST
- + hazelcastInstance
- .getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- + RestConstant.RUNNING_JOBS_URL)
- .then()
- .statusCode(200)
- .body("[0].jobName", equalTo("fake_to_file"))
- .body("[0].jobStatus", equalTo("RUNNING"));
+ Arrays.asList(node2, node1)
+ .forEach(
+ hazelcastInstance2 -> {
+ given().get(
+ HOST
+ + hazelcastInstance2
+ .getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ +
RestConstant.RUNNING_JOBS_URL)
+ .then()
+ .statusCode(200)
+ .body("[0].jobName",
equalTo("fake_to_file"))
+ .body("[0].jobStatus", equalTo("RUNNING"));
+ });
}
@Test
public void testSystemMonitoringInformation() {
- given().get(
- HOST
- + hazelcastInstance
- .getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- + RestConstant.SYSTEM_MONITORING_INFORMATION)
- .then()
- .assertThat()
- .time(lessThan(5000L))
- .statusCode(200);
+ Arrays.asList(node2, node1)
+ .forEach(
+ hazelcastInstance2 -> {
+ given().get(
+ HOST
+ + hazelcastInstance2
+ .getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ +
RestConstant.SYSTEM_MONITORING_INFORMATION)
+ .then()
+ .assertThat()
+ .time(lessThan(5000L))
+ .statusCode(200);
+ });
}
@Test
public void testSubmitJob() {
- Response response = submitJob("BATCH");
- response.then().statusCode(200).body("jobName", equalTo("test测试"));
- String jobId = response.getBody().jsonPath().getString("jobId");
- SeaTunnelServer seaTunnelServer =
- (SeaTunnelServer)
- hazelcastInstance
- .node
- .getNodeExtension()
- .createExtensionServices()
- .get(Constant.SEATUNNEL_SERVICE_NAME);
- Awaitility.await()
- .atMost(2, TimeUnit.MINUTES)
- .untilAsserted(
- () ->
- Assertions.assertEquals(
- JobStatus.RUNNING,
- seaTunnelServer
- .getCoordinatorService()
-
.getJobStatus(Long.parseLong(jobId))));
- Awaitility.await()
- .atMost(2, TimeUnit.MINUTES)
- .untilAsserted(
- () ->
- Assertions.assertEquals(
- JobStatus.FINISHED,
- seaTunnelServer
- .getCoordinatorService()
-
.getJobStatus(Long.parseLong(jobId))));
+ Arrays.asList(node2, node1)
+ .forEach(
+ hazelcastInstance2 -> {
+ Response response = submitJob(hazelcastInstance2,
"BATCH");
+ response.then().statusCode(200).body("jobName",
equalTo("test测试"));
+ String jobId =
response.getBody().jsonPath().getString("jobId");
+ SeaTunnelServer seaTunnelServer = null;
+
+ for (HazelcastInstance hazelcastInstance :
+ Hazelcast.getAllHazelcastInstances()) {
+ SeaTunnelServer seaTunnelServer1 =
+ (SeaTunnelServer)
+ ((HazelcastInstanceProxy)
hazelcastInstance)
+ .getOriginal()
+ .node
+ .getNodeExtension()
+
.createExtensionServices()
+
.get(Constant.SEATUNNEL_SERVICE_NAME);
+
+ if (seaTunnelServer1.isMasterNode()) {
+ seaTunnelServer = seaTunnelServer1;
+ }
+ }
+
+ SeaTunnelServer finalSeaTunnelServer =
seaTunnelServer;
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ JobStatus.FINISHED,
+
finalSeaTunnelServer
+
.getCoordinatorService()
+
.getJobStatus(
+
Long.parseLong(
+
jobId))));
+ });
}
@Test
public void testStopJob() {
- String jobId =
submitJob("STREAMING").getBody().jsonPath().getString("jobId");
- SeaTunnelServer seaTunnelServer =
- (SeaTunnelServer)
- hazelcastInstance
- .node
- .getNodeExtension()
- .createExtensionServices()
- .get(Constant.SEATUNNEL_SERVICE_NAME);
- Awaitility.await()
- .atMost(2, TimeUnit.MINUTES)
- .untilAsserted(
- () ->
- Assertions.assertEquals(
- JobStatus.RUNNING,
- seaTunnelServer
- .getCoordinatorService()
-
.getJobStatus(Long.parseLong(jobId))));
-
- String parameters = "{" + "\"jobId\":" + jobId + "," +
"\"isStopWithSavePoint\":true}";
-
- given().body(parameters)
- .post(
- HOST
- + hazelcastInstance
- .getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- + RestConstant.STOP_JOB_URL)
- .then()
- .statusCode(200)
- .body("jobId", equalTo(jobId));
- Awaitility.await()
- .atMost(6, TimeUnit.MINUTES)
- .untilAsserted(
- () ->
- Assertions.assertEquals(
- JobStatus.FINISHED,
- seaTunnelServer
- .getCoordinatorService()
-
.getJobStatus(Long.parseLong(jobId))));
+ Arrays.asList(node2, node1)
+ .forEach(
+ hazelcastInstance2 -> {
+ String jobId =
+ submitJob(hazelcastInstance2, "STREAMING")
+ .getBody()
+ .jsonPath()
+ .getString("jobId");
+ SeaTunnelServer seaTunnelServer = null;
- String jobId2 =
submitJob("STREAMING").getBody().jsonPath().getString("jobId");
+ for (HazelcastInstance hazelcastInstance :
+ Hazelcast.getAllHazelcastInstances()) {
+ SeaTunnelServer seaTunnelServer1 =
Review Comment:
ditto
##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java:
##########
@@ -83,242 +92,323 @@ void beforeClass() throws Exception {
@Test
public void testGetRunningJobById() {
- given().get(
- HOST
- + hazelcastInstance
- .getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- + RestConstant.RUNNING_JOB_URL
- + "/"
- + clientJobProxy.getJobId())
- .then()
- .statusCode(200)
- .body("jobName", equalTo("fake_to_file"))
- .body("jobStatus", equalTo("RUNNING"));
+ Arrays.asList(node2, node1)
+ .forEach(
+ hazelcastInstance2 -> {
+ given().get(
+ HOST
+ + hazelcastInstance2
+ .getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ +
RestConstant.RUNNING_JOB_URL
+ + "/"
+ +
clientJobProxy.getJobId())
+ .then()
+ .statusCode(200)
+ .body("jobName", equalTo("fake_to_file"))
+ .body("jobStatus", equalTo("RUNNING"));
+ });
}
@Test
public void testGetRunningJobs() {
- given().get(
- HOST
- + hazelcastInstance
- .getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- + RestConstant.RUNNING_JOBS_URL)
- .then()
- .statusCode(200)
- .body("[0].jobName", equalTo("fake_to_file"))
- .body("[0].jobStatus", equalTo("RUNNING"));
+ Arrays.asList(node2, node1)
+ .forEach(
+ hazelcastInstance2 -> {
+ given().get(
+ HOST
+ + hazelcastInstance2
+ .getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ +
RestConstant.RUNNING_JOBS_URL)
+ .then()
+ .statusCode(200)
+ .body("[0].jobName",
equalTo("fake_to_file"))
+ .body("[0].jobStatus", equalTo("RUNNING"));
+ });
}
@Test
public void testSystemMonitoringInformation() {
- given().get(
- HOST
- + hazelcastInstance
- .getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- + RestConstant.SYSTEM_MONITORING_INFORMATION)
- .then()
- .assertThat()
- .time(lessThan(5000L))
- .statusCode(200);
+ Arrays.asList(node2, node1)
+ .forEach(
+ hazelcastInstance2 -> {
Review Comment:
ditto
##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java:
##########
@@ -83,242 +92,323 @@ void beforeClass() throws Exception {
@Test
public void testGetRunningJobById() {
- given().get(
- HOST
- + hazelcastInstance
- .getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- + RestConstant.RUNNING_JOB_URL
- + "/"
- + clientJobProxy.getJobId())
- .then()
- .statusCode(200)
- .body("jobName", equalTo("fake_to_file"))
- .body("jobStatus", equalTo("RUNNING"));
+ Arrays.asList(node2, node1)
+ .forEach(
+ hazelcastInstance2 -> {
+ given().get(
+ HOST
+ + hazelcastInstance2
+ .getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ +
RestConstant.RUNNING_JOB_URL
+ + "/"
+ +
clientJobProxy.getJobId())
+ .then()
+ .statusCode(200)
+ .body("jobName", equalTo("fake_to_file"))
+ .body("jobStatus", equalTo("RUNNING"));
+ });
}
@Test
public void testGetRunningJobs() {
- given().get(
- HOST
- + hazelcastInstance
- .getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- + RestConstant.RUNNING_JOBS_URL)
- .then()
- .statusCode(200)
- .body("[0].jobName", equalTo("fake_to_file"))
- .body("[0].jobStatus", equalTo("RUNNING"));
+ Arrays.asList(node2, node1)
+ .forEach(
+ hazelcastInstance2 -> {
+ given().get(
+ HOST
+ + hazelcastInstance2
+ .getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ +
RestConstant.RUNNING_JOBS_URL)
+ .then()
+ .statusCode(200)
+ .body("[0].jobName",
equalTo("fake_to_file"))
+ .body("[0].jobStatus", equalTo("RUNNING"));
+ });
}
@Test
public void testSystemMonitoringInformation() {
- given().get(
- HOST
- + hazelcastInstance
- .getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- + RestConstant.SYSTEM_MONITORING_INFORMATION)
- .then()
- .assertThat()
- .time(lessThan(5000L))
- .statusCode(200);
+ Arrays.asList(node2, node1)
+ .forEach(
+ hazelcastInstance2 -> {
+ given().get(
+ HOST
+ + hazelcastInstance2
+ .getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ +
RestConstant.SYSTEM_MONITORING_INFORMATION)
+ .then()
+ .assertThat()
+ .time(lessThan(5000L))
+ .statusCode(200);
+ });
}
@Test
public void testSubmitJob() {
- Response response = submitJob("BATCH");
- response.then().statusCode(200).body("jobName", equalTo("test测试"));
- String jobId = response.getBody().jsonPath().getString("jobId");
- SeaTunnelServer seaTunnelServer =
- (SeaTunnelServer)
- hazelcastInstance
- .node
- .getNodeExtension()
- .createExtensionServices()
- .get(Constant.SEATUNNEL_SERVICE_NAME);
- Awaitility.await()
- .atMost(2, TimeUnit.MINUTES)
- .untilAsserted(
- () ->
- Assertions.assertEquals(
- JobStatus.RUNNING,
- seaTunnelServer
- .getCoordinatorService()
-
.getJobStatus(Long.parseLong(jobId))));
- Awaitility.await()
- .atMost(2, TimeUnit.MINUTES)
- .untilAsserted(
- () ->
- Assertions.assertEquals(
- JobStatus.FINISHED,
- seaTunnelServer
- .getCoordinatorService()
-
.getJobStatus(Long.parseLong(jobId))));
+ Arrays.asList(node2, node1)
+ .forEach(
+ hazelcastInstance2 -> {
+ Response response = submitJob(hazelcastInstance2,
"BATCH");
+ response.then().statusCode(200).body("jobName",
equalTo("test测试"));
+ String jobId =
response.getBody().jsonPath().getString("jobId");
+ SeaTunnelServer seaTunnelServer = null;
+
+ for (HazelcastInstance hazelcastInstance :
+ Hazelcast.getAllHazelcastInstances()) {
+ SeaTunnelServer seaTunnelServer1 =
+ (SeaTunnelServer)
+ ((HazelcastInstanceProxy)
hazelcastInstance)
+ .getOriginal()
+ .node
+ .getNodeExtension()
+
.createExtensionServices()
+
.get(Constant.SEATUNNEL_SERVICE_NAME);
+
+ if (seaTunnelServer1.isMasterNode()) {
+ seaTunnelServer = seaTunnelServer1;
+ }
+ }
+
+ SeaTunnelServer finalSeaTunnelServer =
seaTunnelServer;
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ JobStatus.FINISHED,
+
finalSeaTunnelServer
+
.getCoordinatorService()
+
.getJobStatus(
+
Long.parseLong(
+
jobId))));
+ });
}
@Test
public void testStopJob() {
- String jobId =
submitJob("STREAMING").getBody().jsonPath().getString("jobId");
- SeaTunnelServer seaTunnelServer =
- (SeaTunnelServer)
- hazelcastInstance
- .node
- .getNodeExtension()
- .createExtensionServices()
- .get(Constant.SEATUNNEL_SERVICE_NAME);
- Awaitility.await()
- .atMost(2, TimeUnit.MINUTES)
- .untilAsserted(
- () ->
- Assertions.assertEquals(
- JobStatus.RUNNING,
- seaTunnelServer
- .getCoordinatorService()
-
.getJobStatus(Long.parseLong(jobId))));
-
- String parameters = "{" + "\"jobId\":" + jobId + "," +
"\"isStopWithSavePoint\":true}";
-
- given().body(parameters)
- .post(
- HOST
- + hazelcastInstance
- .getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- + RestConstant.STOP_JOB_URL)
- .then()
- .statusCode(200)
- .body("jobId", equalTo(jobId));
- Awaitility.await()
- .atMost(6, TimeUnit.MINUTES)
- .untilAsserted(
- () ->
- Assertions.assertEquals(
- JobStatus.FINISHED,
- seaTunnelServer
- .getCoordinatorService()
-
.getJobStatus(Long.parseLong(jobId))));
+ Arrays.asList(node2, node1)
+ .forEach(
+ hazelcastInstance2 -> {
Review Comment:
ditto
##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java:
##########
@@ -83,242 +92,323 @@ void beforeClass() throws Exception {
@Test
public void testGetRunningJobById() {
- given().get(
- HOST
- + hazelcastInstance
- .getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- + RestConstant.RUNNING_JOB_URL
- + "/"
- + clientJobProxy.getJobId())
- .then()
- .statusCode(200)
- .body("jobName", equalTo("fake_to_file"))
- .body("jobStatus", equalTo("RUNNING"));
+ Arrays.asList(node2, node1)
+ .forEach(
+ hazelcastInstance2 -> {
+ given().get(
+ HOST
+ + hazelcastInstance2
+ .getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ +
RestConstant.RUNNING_JOB_URL
+ + "/"
+ +
clientJobProxy.getJobId())
+ .then()
+ .statusCode(200)
+ .body("jobName", equalTo("fake_to_file"))
+ .body("jobStatus", equalTo("RUNNING"));
+ });
}
@Test
public void testGetRunningJobs() {
- given().get(
- HOST
- + hazelcastInstance
- .getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- + RestConstant.RUNNING_JOBS_URL)
- .then()
- .statusCode(200)
- .body("[0].jobName", equalTo("fake_to_file"))
- .body("[0].jobStatus", equalTo("RUNNING"));
+ Arrays.asList(node2, node1)
+ .forEach(
+ hazelcastInstance2 -> {
+ given().get(
+ HOST
+ + hazelcastInstance2
+ .getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ +
RestConstant.RUNNING_JOBS_URL)
+ .then()
+ .statusCode(200)
+ .body("[0].jobName",
equalTo("fake_to_file"))
+ .body("[0].jobStatus", equalTo("RUNNING"));
+ });
}
@Test
public void testSystemMonitoringInformation() {
- given().get(
- HOST
- + hazelcastInstance
- .getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- + RestConstant.SYSTEM_MONITORING_INFORMATION)
- .then()
- .assertThat()
- .time(lessThan(5000L))
- .statusCode(200);
+ Arrays.asList(node2, node1)
+ .forEach(
+ hazelcastInstance2 -> {
+ given().get(
+ HOST
+ + hazelcastInstance2
+ .getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ +
RestConstant.SYSTEM_MONITORING_INFORMATION)
+ .then()
+ .assertThat()
+ .time(lessThan(5000L))
+ .statusCode(200);
+ });
}
@Test
public void testSubmitJob() {
- Response response = submitJob("BATCH");
- response.then().statusCode(200).body("jobName", equalTo("test测试"));
- String jobId = response.getBody().jsonPath().getString("jobId");
- SeaTunnelServer seaTunnelServer =
- (SeaTunnelServer)
- hazelcastInstance
- .node
- .getNodeExtension()
- .createExtensionServices()
- .get(Constant.SEATUNNEL_SERVICE_NAME);
- Awaitility.await()
- .atMost(2, TimeUnit.MINUTES)
- .untilAsserted(
- () ->
- Assertions.assertEquals(
- JobStatus.RUNNING,
- seaTunnelServer
- .getCoordinatorService()
-
.getJobStatus(Long.parseLong(jobId))));
- Awaitility.await()
- .atMost(2, TimeUnit.MINUTES)
- .untilAsserted(
- () ->
- Assertions.assertEquals(
- JobStatus.FINISHED,
- seaTunnelServer
- .getCoordinatorService()
-
.getJobStatus(Long.parseLong(jobId))));
+ Arrays.asList(node2, node1)
+ .forEach(
+ hazelcastInstance2 -> {
+ Response response = submitJob(hazelcastInstance2,
"BATCH");
+ response.then().statusCode(200).body("jobName",
equalTo("test测试"));
+ String jobId =
response.getBody().jsonPath().getString("jobId");
+ SeaTunnelServer seaTunnelServer = null;
+
+ for (HazelcastInstance hazelcastInstance :
+ Hazelcast.getAllHazelcastInstances()) {
+ SeaTunnelServer seaTunnelServer1 =
Review Comment:
```suggestion
SeaTunnelServer server =
```
##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java:
##########
@@ -83,242 +92,323 @@ void beforeClass() throws Exception {
@Test
public void testGetRunningJobById() {
- given().get(
- HOST
- + hazelcastInstance
- .getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- + RestConstant.RUNNING_JOB_URL
- + "/"
- + clientJobProxy.getJobId())
- .then()
- .statusCode(200)
- .body("jobName", equalTo("fake_to_file"))
- .body("jobStatus", equalTo("RUNNING"));
+ Arrays.asList(node2, node1)
+ .forEach(
+ hazelcastInstance2 -> {
+ given().get(
+ HOST
+ + hazelcastInstance2
+ .getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ +
RestConstant.RUNNING_JOB_URL
+ + "/"
+ +
clientJobProxy.getJobId())
+ .then()
+ .statusCode(200)
+ .body("jobName", equalTo("fake_to_file"))
+ .body("jobStatus", equalTo("RUNNING"));
+ });
}
@Test
public void testGetRunningJobs() {
- given().get(
- HOST
- + hazelcastInstance
- .getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- + RestConstant.RUNNING_JOBS_URL)
- .then()
- .statusCode(200)
- .body("[0].jobName", equalTo("fake_to_file"))
- .body("[0].jobStatus", equalTo("RUNNING"));
+ Arrays.asList(node2, node1)
+ .forEach(
+ hazelcastInstance2 -> {
+ given().get(
+ HOST
+ + hazelcastInstance2
+ .getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ +
RestConstant.RUNNING_JOBS_URL)
+ .then()
+ .statusCode(200)
+ .body("[0].jobName",
equalTo("fake_to_file"))
+ .body("[0].jobStatus", equalTo("RUNNING"));
+ });
}
@Test
public void testSystemMonitoringInformation() {
- given().get(
- HOST
- + hazelcastInstance
- .getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- + RestConstant.SYSTEM_MONITORING_INFORMATION)
- .then()
- .assertThat()
- .time(lessThan(5000L))
- .statusCode(200);
+ Arrays.asList(node2, node1)
+ .forEach(
+ hazelcastInstance2 -> {
+ given().get(
+ HOST
+ + hazelcastInstance2
+ .getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ +
RestConstant.SYSTEM_MONITORING_INFORMATION)
+ .then()
+ .assertThat()
+ .time(lessThan(5000L))
+ .statusCode(200);
+ });
}
@Test
public void testSubmitJob() {
- Response response = submitJob("BATCH");
- response.then().statusCode(200).body("jobName", equalTo("test测试"));
- String jobId = response.getBody().jsonPath().getString("jobId");
- SeaTunnelServer seaTunnelServer =
- (SeaTunnelServer)
- hazelcastInstance
- .node
- .getNodeExtension()
- .createExtensionServices()
- .get(Constant.SEATUNNEL_SERVICE_NAME);
- Awaitility.await()
- .atMost(2, TimeUnit.MINUTES)
- .untilAsserted(
- () ->
- Assertions.assertEquals(
- JobStatus.RUNNING,
- seaTunnelServer
- .getCoordinatorService()
-
.getJobStatus(Long.parseLong(jobId))));
- Awaitility.await()
- .atMost(2, TimeUnit.MINUTES)
- .untilAsserted(
- () ->
- Assertions.assertEquals(
- JobStatus.FINISHED,
- seaTunnelServer
- .getCoordinatorService()
-
.getJobStatus(Long.parseLong(jobId))));
+ Arrays.asList(node2, node1)
+ .forEach(
+ hazelcastInstance2 -> {
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
