This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 9aea5c9546 [Improve][Zeta] engine-server and seantunnel-ui support
remote paginated queries (#9951)
9aea5c9546 is described below
commit 9aea5c9546e5540c02c2aa9a78e6833fcf4767c2
Author: liucongjy <[email protected]>
AuthorDate: Fri Jan 30 15:36:44 2026 +0800
[Improve][Zeta] engine-server and seantunnel-ui support remote paginated
queries (#9951)
---
docs/en/engines/zeta/rest-api-v2.md | 15 +-
docs/zh/engines/zeta/rest-api-v2.md | 15 +-
.../engine/server/rest/service/JobInfoService.java | 4 +
.../server/rest/servlet/FinishedJobsServlet.java | 4 +-
.../server/rest/servlet/PageBaseServlet.java | 75 ++++++
.../server/rest/servlet/RunningJobsServlet.java | 4 +-
.../engine/server/rest/RestApiHttpsTest.java | 273 ++++++++++++++++++++-
.../server/rest/RestApiRequestCallback.java} | 16 +-
.../seatunnel-engine-ui/src/service/job/index.ts | 6 +-
.../seatunnel-engine-ui/src/service/job/types.ts | 5 +
.../seatunnel-engine-ui/src/tests/jobs.spec.ts | 26 +-
.../src/views/jobs/finished-jobs.tsx | 25 +-
.../src/views/jobs/running-jobs.tsx | 25 +-
13 files changed, 437 insertions(+), 56 deletions(-)
diff --git a/docs/en/engines/zeta/rest-api-v2.md
b/docs/en/engines/zeta/rest-api-v2.md
index b3808d35d7..181bf595ff 100644
--- a/docs/en/engines/zeta/rest-api-v2.md
+++ b/docs/en/engines/zeta/rest-api-v2.md
@@ -75,13 +75,18 @@ Please refer [security](security.md)
------------------------------------------------------------------------------------------
-### Returns An Overview And State Of All Jobs
+### Query An Overview And State Of Running Jobs
<details>
- <summary><code>GET</code> <code><b>/running-jobs</b></code> <code>(Returns an
overview over all jobs and their current state.)</code></summary>
+ <summary><code>GET</code> <code><b>/running-jobs?page=1&rows=10</b></code>
<code>(Query an overview over running jobs and their current
state.)</code></summary>
#### Parameters
+> | name | type | data type | description
|
+>
|-------|----------|-----------|-----------------------------------------------------------------------------------|
+> | page | optional | int | page number.
|
+> | rows | optional | int | page size.
|
+
#### Responses
```json
@@ -465,16 +470,18 @@ When we can't get the job info, the response will be:
------------------------------------------------------------------------------------------
-### Return All Finished Jobs Info
+### Query Finished Jobs Info
<details>
- <summary><code>GET</code> <code><b>/finished-jobs/:state</b></code>
<code>(Return all finished Jobs Info.)</code></summary>
+ <summary><code>GET</code>
<code><b>/finished-jobs/:state?page=1&rows=10</b></code> <code>(Query finished
Jobs Info.)</code></summary>
#### Parameters
> | name | type | data type | description
> |
> |-------|----------|-----------|-----------------------------------------------------------------------------------|
> | state | optional | string | finished job status.
> `FINISHED`,`CANCELED`,`FAILED`,`SAVEPOINT_DONE`,`UNKNOWABLE` |
+> | page | optional | int | page number.
|
+> | rows | optional | int | page size.
|
#### Responses
diff --git a/docs/zh/engines/zeta/rest-api-v2.md
b/docs/zh/engines/zeta/rest-api-v2.md
index 8bab244208..35e36761d9 100644
--- a/docs/zh/engines/zeta/rest-api-v2.md
+++ b/docs/zh/engines/zeta/rest-api-v2.md
@@ -73,13 +73,18 @@ seatunnel:
------------------------------------------------------------------------------------------
-### 返回所有作业及其当前状态的概览
+### 查询作业及其当前状态的概览
<details>
- <summary><code>GET</code> <code><b>/running-jobs</b></code>
<code>(返回所有作业及其当前状态的概览。)</code></summary>
+ <summary><code>GET</code> <code><b>/running-jobs?page=1&rows=10</b></code>
<code>(查询作业及其当前状态的概览。)</code></summary>
#### 参数
+> | 参数名称 | 是否必传 | 参数类型 | 参数描述 |
+> |------|------|------|------|
+> | page | 否 | int | 页号 |
+> | rows | 否 | int | 每页行数 |
+
#### 响应
```json
@@ -444,16 +449,18 @@ seatunnel:
------------------------------------------------------------------------------------------
-### 返回所有已完成的作业信息
+### 查询已完成的作业信息
<details>
- <summary><code>GET</code> <code><b>/finished-jobs/:state</b></code>
<code>(返回所有已完成的作业信息。)</code></summary>
+ <summary><code>GET</code>
<code><b>/finished-jobs/:state?page=1&rows=10</b></code>
<code>(查询已完成的作业信息。)</code></summary>
#### 参数
> | 参数名称 | 是否必传 | 参数类型 | 参数描述
> |
> |-------|----------|--------|-----------------------------------------------------------------------------------|
> | state | optional | string | finished job status.
> `FINISHED`,`CANCELED`,`FAILED`,`SAVEPOINT_DONE`,`UNKNOWABLE` |
+> | page | 否 | int | 页号 |
+> | rows | 否 | int | 每页行数 |
#### 响应
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
index 09ac57100d..d3a8eacd6f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
@@ -133,6 +133,10 @@ public class JobInfoService extends BaseService {
IMap<Long, JobInfo> values =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_INFO);
return values.entrySet().stream()
+ .sorted(
+ Comparator.comparing(
+ entry ->
entry.getValue().getInitializationTimestamp(),
+ Comparator.reverseOrder()))
.map(jobInfoEntry -> convertToJson(jobInfoEntry.getValue(),
jobInfoEntry.getKey()))
.collect(JsonArray::new, JsonArray::add, JsonArray::add);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/FinishedJobsServlet.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/FinishedJobsServlet.java
index 9f0e4e4c49..dc1a8aefa6 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/FinishedJobsServlet.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/FinishedJobsServlet.java
@@ -27,7 +27,7 @@ import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
-public class FinishedJobsServlet extends BaseServlet {
+public class FinishedJobsServlet extends PageBaseServlet {
private static final long serialVersionUID = 1L;
@@ -50,6 +50,6 @@ public class FinishedJobsServlet extends BaseServlet {
state = "";
}
- writeJson(resp, jobInfoService.getJobsByStateJson(state));
+ writeJsonWithPagination(req, resp,
jobInfoService.getJobsByStateJson(state));
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/PageBaseServlet.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/PageBaseServlet.java
new file mode 100644
index 0000000000..147dfe0100
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/PageBaseServlet.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.rest.servlet;
+
+import com.hazelcast.internal.json.JsonArray;
+import com.hazelcast.internal.json.JsonObject;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class PageBaseServlet extends BaseServlet {
+ private final String pageParam = "page";
+ private final String rowsParam = "rows";
+
+ public PageBaseServlet(NodeEngineImpl nodeEngine) {
+ super(nodeEngine);
+ }
+
+ protected void writeJsonWithPagination(
+ HttpServletRequest req, HttpServletResponse resp, JsonArray
jsonArray)
+ throws IOException {
+ int total = jsonArray.size();
+
+ // fetch pagination params, if page exist, then paginate
data,pagination data format like:
+ // {"data": [], "total": 10}
+ Map<String, String> parameterMap = getParameterMap(req);
+ if (parameterMap != null && parameterMap.containsKey(pageParam)) {
+ int page = Integer.parseInt(parameterMap.get(pageParam));
+ int rows =
+ parameterMap.get(rowsParam) != null
+ ? Integer.parseInt(parameterMap.get(rowsParam))
+ : 10;
+ int start = (page - 1) * rows;
+ if (start > total || page < 1) {
+ throw new IllegalArgumentException(
+ page < 1
+ ? "Page number must be greater than 0"
+ : "Page number exceeds total pages");
+ }
+ JsonArray paginatedArray = new JsonArray();
+ jsonArray
+ .values()
+ .subList(start, Math.min(start + rows, total))
+ .forEach(
+ t -> {
+ paginatedArray.add(t);
+ });
+ JsonObject paginatedObj = new JsonObject();
+ paginatedObj.add("data", paginatedArray);
+ paginatedObj.add("total", total);
+ writeJson(resp, paginatedObj);
+ } else {
+ writeJson(resp, jsonArray);
+ }
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/RunningJobsServlet.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/RunningJobsServlet.java
index 5eed9bd58c..ab02aad5bb 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/RunningJobsServlet.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/RunningJobsServlet.java
@@ -29,7 +29,7 @@ import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
@Slf4j
-public class RunningJobsServlet extends BaseServlet {
+public class RunningJobsServlet extends PageBaseServlet {
private final JobInfoService jobInfoService;
@@ -42,6 +42,6 @@ public class RunningJobsServlet extends BaseServlet {
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
- writeJson(resp, jobInfoService.getRunningJobsJson());
+ writeJsonWithPagination(req, resp,
jobInfoService.getRunningJobsJson());
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpsTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpsTest.java
index fe55a70e91..29c3b90b51 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpsTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpsTest.java
@@ -20,7 +20,11 @@ package org.apache.seatunnel.engine.server.rest;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.config.server.HttpConfig;
import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+import org.apache.seatunnel.engine.server.CoordinatorService;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
import org.apache.seatunnel.engine.server.TestUtils;
@@ -32,6 +36,11 @@ import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
import com.hazelcast.config.Config;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import com.hazelcast.internal.json.Json;
+import com.hazelcast.internal.json.JsonArray;
+import com.hazelcast.internal.json.JsonObject;
+import com.hazelcast.internal.serialization.Data;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
@@ -40,13 +49,21 @@ import javax.net.ssl.SSLHandshakeException;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+
/** Test for Rest API with HTTPS. */
@DisabledOnOs(OS.WINDOWS)
public class RestApiHttpsTest extends AbstractSeaTunnelServerTest {
private static final int HTTP_PORT = 28080;
private static final int HTTPS_PORT = 28443;
+
+ private static final int HTTP_PORT2 = 28088;
+ private static final int HTTPS_PORT2 = 28543;
private static final String SERVER_KEYSTORE_PASSWORD =
"server_keystore_password";
private static final String CLIENT_KEYSTORE_PASSWORD =
"client_keystore_password";
@@ -81,18 +98,12 @@ public class RestApiHttpsTest extends
AbstractSeaTunnelServerTest {
@Test
public void testRestApiHttp() throws Exception {
- HttpURLConnection conn =
- (HttpURLConnection)
- new java.net.URL("http://localhost:" + HTTP_PORT +
"/overview")
- .openConnection();
- try (BufferedReader in = new BufferedReader(new
InputStreamReader(conn.getInputStream()))) {
-
- Assertions.assertEquals(200, conn.getResponseCode());
- String response = in.lines().collect(Collectors.joining());
- Assertions.assertTrue(response.contains("projectVersion"));
- } finally {
- conn.disconnect();
- }
+ restApiRequestHttp(
+ "http://localhost:" + HTTP_PORT + "/overview",
+ (code, content) -> {
+ Assertions.assertEquals(200, code);
+ Assertions.assertTrue(content.contains("projectVersion"));
+ });
}
@Test
@@ -126,4 +137,242 @@ public class RestApiHttpsTest extends
AbstractSeaTunnelServerTest {
conn.getResponseCode();
});
}
+
+ @Test
+ public void testFinishedJobsApi() throws Exception {
+ JobInformation jobInformation = getSeatunnelServer("testFinishedJobs");
+ int jobNum = 7;
+ int pageSize = 5;
+ long jobId = 1000L;
+ for (int i = 0; i < jobNum; i++) {
+ startJob(i + jobId, "fake_to_console.conf", jobInformation);
+ }
+
+ // wait until all jobs are finished
+ await().pollDelay(5, TimeUnit.SECONDS)
+ .atMost(30, TimeUnit.SECONDS)
+ .pollInterval(100, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ assertEquals(
+ jobNum,
+ jobInformation
+ .coordinatorService
+ .getJobCountMetrics()
+ .getFinishedJobCount()));
+
+ // pagination test
+ // page 1
+ restApiRequestHttp(
+ "http://localhost:" + HTTP_PORT2 +
"/finished-jobs?page=1&rows=" + pageSize,
+ (code, content) -> {
+ Assertions.assertEquals(200, code);
+ JsonObject resultJson = (JsonObject) Json.parse(content);
+ Assertions.assertTrue(
+ resultJson.get("data") != null &&
resultJson.get("total") != null);
+ int total = resultJson.getInt("total", 0);
+ JsonArray data = (JsonArray) resultJson.get("data");
+ Assertions.assertTrue(total == jobNum && data.size() ==
pageSize);
+ });
+ // page 2
+ restApiRequestHttp(
+ "http://localhost:" + HTTP_PORT2 +
"/finished-jobs?page=2&rows=" + pageSize,
+ (code, content) -> {
+ Assertions.assertEquals(200, code);
+ JsonObject resultJson = (JsonObject) Json.parse(content);
+ Assertions.assertTrue(
+ resultJson.get("data") != null &&
resultJson.get("total") != null);
+ int total = resultJson.getInt("total", 0);
+ JsonArray data = (JsonArray) resultJson.get("data");
+ Assertions.assertTrue(total == jobNum && data.size() == 2);
+ });
+ // no pagination test
+ restApiRequestHttp(
+ "http://localhost:" + HTTP_PORT2 + "/finished-jobs",
+ (code, content) -> {
+ Assertions.assertEquals(200, code);
+ JsonArray resultJson = (JsonArray) Json.parse(content);
+ Assertions.assertTrue(resultJson != null);
+ Assertions.assertTrue(resultJson.size() == jobNum);
+ });
+ shutdown(jobInformation);
+ }
+
+ @Test
+ public void testRunningJobsApi() throws Exception {
+ JobInformation jobInformation = getSeatunnelServer("testRunningJobs");
+ int jobNum = 20;
+ int pageSize = 5;
+ long jobId = 2000L;
+ for (int i = 0; i < jobNum; i++) {
+ startJob(i + jobId, "stream_fake_to_console.conf", jobInformation);
+ }
+
+ // wait until all jobs are running
+ await().atMost(60, TimeUnit.SECONDS)
+ .pollInterval(100, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ assertEquals(
+ jobNum,
+ jobInformation
+ .coordinatorService
+ .getRunningJobMetrics()
+ .size()));
+
+ // pagination test
+ restApiRequestHttp(
+ "http://localhost:" + HTTP_PORT2 +
"/running-jobs?page=1&rows=" + pageSize,
+ (code, content) -> {
+ Assertions.assertEquals(200, code);
+ JsonObject resultJson = (JsonObject) Json.parse(content);
+ Assertions.assertTrue(
+ resultJson.get("data") != null &&
resultJson.get("total") != null);
+ int total = resultJson.getInt("total", 0);
+ JsonArray data = (JsonArray) resultJson.get("data");
+ Assertions.assertTrue(total == jobNum && data.size() ==
pageSize);
+ });
+ // no pagination test
+ restApiRequestHttp(
+ "http://localhost:" + HTTP_PORT2 + "/running-jobs",
+ (code, content) -> {
+ Assertions.assertEquals(200, code);
+ JsonArray resultJson = (JsonArray) Json.parse(content);
+ Assertions.assertTrue(resultJson != null);
+ Assertions.assertTrue(resultJson.size() == jobNum);
+ });
+ shutdown(jobInformation);
+ }
+
+ @Test
+ public void testPageNumberOutOfRange() throws Exception {
+ JobInformation jobInformation =
getSeatunnelServer("testPageNumberOutOfRange");
+ int jobNum = 7;
+ int pageSize = 5;
+ long jobId = 3000L;
+ for (int i = 0; i < jobNum; i++) {
+ startJob(i + jobId, "fake_to_console.conf", jobInformation);
+ }
+
+ // wait until all jobs are finished
+ await().pollDelay(5, TimeUnit.SECONDS)
+ .atMost(30, TimeUnit.SECONDS)
+ .pollInterval(100, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ assertEquals(
+ jobNum,
+ jobInformation
+ .coordinatorService
+ .getJobCountMetrics()
+ .getFinishedJobCount()));
+
+ restApiRequestHttp(
+ "http://localhost:" + HTTP_PORT2 +
"/finished-jobs?page=10&rows=" + pageSize,
+ (code, content) -> {
+ Assertions.assertEquals(400, code);
+ Assertions.assertTrue(content.contains("Page number
exceeds total pages"));
+ });
+ shutdown(jobInformation);
+ }
+
+ private void restApiRequestHttp(String url, RestApiRequestCallback
callback) throws Exception {
+ HttpURLConnection conn = (HttpURLConnection) new
java.net.URL(url).openConnection();
+ if (conn.getResponseCode() != 200) {
+ try (BufferedReader in =
+ new BufferedReader(new
InputStreamReader(conn.getErrorStream()))) {
+ String response = in.lines().collect(Collectors.joining());
+ if (callback != null) {
+ callback.callback(conn.getResponseCode(), response);
+ }
+ } finally {
+ conn.disconnect();
+ }
+ } else {
+ try (BufferedReader in =
+ new BufferedReader(new
InputStreamReader(conn.getInputStream()))) {
+ String response = in.lines().collect(Collectors.joining());
+ if (callback != null) {
+ callback.callback(conn.getResponseCode(), response);
+ }
+ } finally {
+ conn.disconnect();
+ }
+ }
+ }
+
+ private void startJob(Long jobId, String path, JobInformation
jobInformation) {
+ LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path,
jobId.toString(), jobId);
+
+ JobImmutableInformation jobImmutableInformation =
+ new JobImmutableInformation(
+ jobId,
+ "Test",
+
jobInformation.healcastInstance.node.nodeEngine.getSerializationService(),
+ testLogicalDag,
+ Collections.emptyList(),
+ Collections.emptyList());
+
+ Data data =
+ jobInformation
+ .healcastInstance
+ .node
+ .nodeEngine
+ .getSerializationService()
+ .toData(jobImmutableInformation);
+
+ PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
+ jobInformation.coordinatorService.submitJob(
+ jobId, data,
jobImmutableInformation.isStartWithSavePoint());
+ voidPassiveCompletableFuture.join();
+ }
+
+ private JobInformation getSeatunnelServer(String testClassName) {
+ Config hazelcastConfig = Config.loadFromString(getHazelcastConfig());
+ hazelcastConfig.setClusterName(
+ TestUtils.getClusterName("RestApiHttpsTest_" + testClassName));
+ SeaTunnelConfig seaTunnelConfig = loadSeaTunnelConfig();
+ seaTunnelConfig.setHazelcastConfig(hazelcastConfig);
+ seaTunnelConfig.getEngineConfig().setMode(ExecutionMode.LOCAL);
+
+ HttpConfig httpConfig =
seaTunnelConfig.getEngineConfig().getHttpConfig();
+ httpConfig.setEnabled(true);
+ httpConfig.setPort(HTTP_PORT2);
+ httpConfig.setHttpsPort(HTTPS_PORT2);
+ httpConfig.setEnableHttps(false);
+
+ HazelcastInstanceImpl healcastInstance =
+
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+
+ SeaTunnelServer server1 =
+
healcastInstance.node.getNodeEngine().getService(SeaTunnelServer.SERVICE_NAME);
+ CoordinatorService coordinatorService =
server1.getCoordinatorService();
+ Assertions.assertTrue(coordinatorService.isCoordinatorActive());
+ return new JobInformation(healcastInstance, coordinatorService,
server1);
+ }
+
+ private void shutdown(JobInformation jobInformation) {
+ if (jobInformation.server != null) {
+ jobInformation.server.shutdown(true);
+ }
+ if (jobInformation.healcastInstance != null) {
+ jobInformation.healcastInstance.shutdown();
+ }
+ }
+
+ private static class JobInformation {
+
+ public final HazelcastInstanceImpl healcastInstance;
+ public final CoordinatorService coordinatorService;
+ public final SeaTunnelServer server;
+
+ public JobInformation(
+ HazelcastInstanceImpl coordinatorServiceTest,
+ CoordinatorService coordinatorService,
+ SeaTunnelServer server) {
+ this.healcastInstance = coordinatorServiceTest;
+ this.coordinatorService = coordinatorService;
+ this.server = server;
+ }
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-ui/src/service/job/index.ts
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiRequestCallback.java
similarity index 63%
copy from seatunnel-engine/seatunnel-engine-ui/src/service/job/index.ts
copy to
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiRequestCallback.java
index 4e05032cd5..804971ad1c 100644
--- a/seatunnel-engine/seatunnel-engine-ui/src/service/job/index.ts
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiRequestCallback.java
@@ -15,17 +15,9 @@
* limitations under the License.
*/
-import { get } from '@/service/service'
-import type { Job } from './types'
+package org.apache.seatunnel.engine.server.rest;
-export const getRunningJobs = () => get<Job[]>('/running-jobs')
-export const getFinishedJobs = () => get<Job[]>(`/finished-jobs`)
-export const getJobInfo = (jobId: string) => get<Job>(`/job-info/${jobId}`)
-export const getRunningJobInfo = (jobId: string) =>
get<Job>(`/running-job/${jobId}`)
-
-export const JobsService = {
- getRunningJobs,
- getFinishedJobs,
- getJobInfo,
- getRunningJobInfo
+@FunctionalInterface
+public interface RestApiRequestCallback {
+ void callback(int responseCode, String responseContent);
}
diff --git a/seatunnel-engine/seatunnel-engine-ui/src/service/job/index.ts
b/seatunnel-engine/seatunnel-engine-ui/src/service/job/index.ts
index 4e05032cd5..f494cac1a2 100644
--- a/seatunnel-engine/seatunnel-engine-ui/src/service/job/index.ts
+++ b/seatunnel-engine/seatunnel-engine-ui/src/service/job/index.ts
@@ -16,10 +16,10 @@
*/
import { get } from '@/service/service'
-import type { Job } from './types'
+import type {Job, JobPage} from './types'
-export const getRunningJobs = () => get<Job[]>('/running-jobs')
-export const getFinishedJobs = () => get<Job[]>(`/finished-jobs`)
+export const getRunningJobs = (page: number, rows: number) =>
get<JobPage>('/running-jobs', {page: page, rows: rows})
+export const getFinishedJobs = (page: number, rows: number) =>
get<JobPage>(`/finished-jobs`, {page: page, rows: rows})
export const getJobInfo = (jobId: string) => get<Job>(`/job-info/${jobId}`)
export const getRunningJobInfo = (jobId: string) =>
get<Job>(`/running-job/${jobId}`)
diff --git a/seatunnel-engine/seatunnel-engine-ui/src/service/job/types.ts
b/seatunnel-engine/seatunnel-engine-ui/src/service/job/types.ts
index 00c72ec979..8a6bb836a1 100644
--- a/seatunnel-engine/seatunnel-engine-ui/src/service/job/types.ts
+++ b/seatunnel-engine/seatunnel-engine-ui/src/service/job/types.ts
@@ -78,3 +78,8 @@ export interface Job {
metrics: Metrics
pluginJarsUrls: []
}
+
+export interface JobPage {
+ total: number
+ data: Job[]
+}
diff --git a/seatunnel-engine/seatunnel-engine-ui/src/tests/jobs.spec.ts
b/seatunnel-engine/seatunnel-engine-ui/src/tests/jobs.spec.ts
index fd0df0b300..376667a275 100644
--- a/seatunnel-engine/seatunnel-engine-ui/src/tests/jobs.spec.ts
+++ b/seatunnel-engine/seatunnel-engine-ui/src/tests/jobs.spec.ts
@@ -24,7 +24,7 @@ import { createPinia, setActivePinia } from 'pinia'
import i18n from '@/locales'
import finishedJobs from '@/views/jobs/finished-jobs'
import { JobsService } from '@/service/job'
-import type { Job } from '@/service/job/types'
+import type { JobPage, Job } from '@/service/job/types'
describe('jobs', () => {
const app = createApp({})
@@ -34,7 +34,7 @@ describe('jobs', () => {
setActivePinia(createPinia())
})
test('Running Jobs component', async () => {
- const mockData = [] as Job[]
+ const mockData = {} as JobPage
vi.spyOn(JobsService, 'getRunningJobs').mockResolvedValue(mockData)
const wrapper = mount(runningJobs, {
@@ -47,16 +47,16 @@ describe('jobs', () => {
expect(wrapper.text()).toContain('Running Jobs')
})
test('Finished Jobs component', async () => {
- const mockData = [
- {
- jobId: '888413907541032961',
- jobName: 'SeaTunnel_Job',
- jobStatus: 'FINISHED',
- errorMsg: '',
- createTime: '2024-09-17 21:19:41',
- finishTime: '2024-09-17 21:19:44'
- }
- ] as Job[]
+ const mockData = { data: [
+ {
+ jobId: '888413907541032961',
+ jobName: 'SeaTunnel_Job',
+ jobStatus: 'FINISHED',
+ errorMsg: '',
+ createTime: '2024-09-17 21:19:41',
+ finishTime: '2024-09-17 21:19:44'
+ }
+ ] as Job[], total: 1} as JobPage
vi.spyOn(JobsService, 'getFinishedJobs').mockResolvedValue(mockData)
@@ -67,7 +67,7 @@ describe('jobs', () => {
}
})
expect(JobsService.getFinishedJobs).toHaveBeenCalledTimes(1)
- expect(JobsService.getFinishedJobs).toHaveBeenCalledWith()
+ expect(JobsService.getFinishedJobs).toHaveBeenCalledWith(1, 10)
await flushPromises()
expect(wrapper.text()).toContain('SeaTunnel_Job')
})
diff --git
a/seatunnel-engine/seatunnel-engine-ui/src/views/jobs/finished-jobs.tsx
b/seatunnel-engine/seatunnel-engine-ui/src/views/jobs/finished-jobs.tsx
index 6d916c8d3c..a399173858 100644
--- a/seatunnel-engine/seatunnel-engine-ui/src/views/jobs/finished-jobs.tsx
+++ b/seatunnel-engine/seatunnel-engine-ui/src/views/jobs/finished-jobs.tsx
@@ -30,10 +30,15 @@ export default defineComponent({
const { t } = useI18n()
const jobs = ref([] as Job[])
+ const page = ref(1)
+ const pageSize = ref(10)
+ const total = ref(0)
let timer: NodeJS.Timeout
const fetch = async () => {
- jobs.value = await JobsService.getFinishedJobs()
+ const res = await JobsService.getFinishedJobs(page.value, pageSize.value)
+ jobs.value = res.data
+ total.value = res.total
timer = setTimeout(fetch, 5000)
}
onUnmounted(() => clearTimeout(timer))
@@ -105,7 +110,23 @@ export default defineComponent({
return () => (
<div class="w-full bg-white p-6 border border-gray-100 rounded-xl">
<h2 class="font-bold text-2xl pb-6">{t('jobs.finishedJobs')}</h2>
- <NDataTable columns={columns} data={jobs.value} pagination={false}
bordered={false} />
+ <NDataTable columns={columns} data={jobs.value} remote={true}
pagination={{
+ page: page.value,
+ pageSize: pageSize.value,
+ itemCount: total.value,
+ showSizePicker: true,
+ pageSizes: [10, 20, 50, 100, 500],
+ showQuickJumper: true,
+ onUpdatePage: (newPage: number) => {
+ page.value = newPage
+ fetch()
+ },
+ onUpdatePageSize: (newPageSize: number) => {
+ pageSize.value = newPageSize
+ page.value = 1
+ fetch()
+ }
+ }} bordered={false} />
</div>
)
}
diff --git
a/seatunnel-engine/seatunnel-engine-ui/src/views/jobs/running-jobs.tsx
b/seatunnel-engine/seatunnel-engine-ui/src/views/jobs/running-jobs.tsx
index c55a46ca71..f5daa5f84e 100644
--- a/seatunnel-engine/seatunnel-engine-ui/src/views/jobs/running-jobs.tsx
+++ b/seatunnel-engine/seatunnel-engine-ui/src/views/jobs/running-jobs.tsx
@@ -30,10 +30,15 @@ export default defineComponent({
const { t } = useI18n()
const jobs = ref([] as Job[])
+ const page = ref(1)
+ const pageSize = ref(10)
+ const total = ref(0)
let timer: NodeJS.Timeout
const fetch = async () => {
- jobs.value = await JobsService.getRunningJobs()
+ const res = await JobsService.getRunningJobs(page.value, pageSize.value)
+ jobs.value = res.data
+ total.value = res.total
timer = setTimeout(fetch, 5000)
}
onUnmounted(() => clearTimeout(timer))
@@ -101,7 +106,23 @@ export default defineComponent({
return () => (
<div class="w-full bg-white p-6 border border-gray-100 rounded-xl">
<h2 class="font-bold text-2xl pb-6">{t('jobs.runningJobs')}</h2>
- <NDataTable columns={columns} data={jobs.value} pagination={false}
bordered={false} />
+ <NDataTable columns={columns} data={jobs.value} remote={true}
pagination={{
+ page: page.value,
+ pageSize: pageSize.value,
+ itemCount: total.value,
+ showSizePicker: true,
+ pageSizes: [10, 20, 50, 100, 500],
+ showQuickJumper: true,
+ onUpdatePage: (newPage: number) => {
+ page.value = newPage
+ fetch()
+ },
+ onUpdatePageSize: (newPageSize: number) => {
+ pageSize.value = newPageSize
+ page.value = 1
+ fetch()
+ }
+ }} bordered={false} />
</div>
)
}