This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git
The following commit(s) were added to refs/heads/main by this push:
new 7205efe1 [Improvement][Seatunnel-web] Add support to execute
seatunnel-web REST API e2e (#180)
7205efe1 is described below
commit 7205efe14cf42683ffb9a22113d6d4fe32e6f857
Author: Mohammad Arshad <[email protected]>
AuthorDate: Wed Aug 14 17:44:55 2024 +0530
[Improvement][Seatunnel-web] Add support to execute seatunnel-web REST API
e2e (#180)
---
.gitignore | 1 +
pom.xml | 3 +-
.../datasource/AbstractDataSourceClient.java | 5 +-
seatunnel-web-it/README.md | 13 ++
seatunnel-web-it/pom.xml | 99 +++++++++++
.../seatunnel/app/common/SeaTunnelWebCluster.java | 95 +++++++++++
.../app/common/SeatunnelWebTestingBase.java | 116 +++++++++++++
.../apache/seatunnel/app/common/TokenProvider.java | 40 +++++
.../app/controller/ConnectorControllerWrapper.java | 64 +++++++
.../app/controller/JobConfigControllerWrapper.java | 63 +++++++
.../controller/JobDefinitionControllerWrapper.java | 75 ++++++++
.../controller/JobExecutorControllerWrapper.java | 50 ++++++
.../controller/JobMetricsControllerWrapper.java | 49 ++++++
.../app/controller/JobTaskControllerWrapper.java | 189 +++++++++++++++++++++
.../SeatunnelDatasourceControllerWrapper.java | 107 ++++++++++++
.../app/controller/UserControllerWrapper.java | 58 +++++++
.../app/domain/ConnectorInfoDeserializer.java | 48 ++++++
.../app/domain/PluginIdentifierDeserializer.java | 42 +++++
.../app/test/ConnectorControllerTest.java | 78 +++++++++
.../app/test/JobConfigControllerTest.java | 74 ++++++++
.../app/test/JobDefinitionControllerTest.java | 82 +++++++++
.../app/test/JobExecutorControllerTest.java | 74 ++++++++
.../app/test/JobMetricsControllerTest.java | 88 ++++++++++
.../seatunnel/app/test/JobTaskControllerTest.java | 124 ++++++++++++++
.../test/SeatunnelDatasourceControllerTest.java | 131 ++++++++++++++
.../seatunnel/app/test/UserControllerTest.java | 104 ++++++++++++
.../apache/seatunnel/app/utils/JSONTestUtils.java | 151 ++++++++++++++++
.../org/apache/seatunnel/app/utils/JobUtils.java | 141 +++++++++++++++
.../src/test/resources/application.yml | 60 +++++++
.../src/test/resources/hazelcast-client.yaml | 27 +++
seatunnel-web-it/src/test/resources/hazelcast.yaml | 47 +++++
.../src/test/resources/logback-spring.xml | 48 ++++++
seatunnel-web-it/src/test/resources/seatunnel.yaml | 36 ++++
tools/dependencies/known-dependencies.txt | 7 +-
34 files changed, 2386 insertions(+), 3 deletions(-)
diff --git a/.gitignore b/.gitignore
index 9bf8b904..fb8bd5d7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -52,3 +52,4 @@ spark-warehouse
/seatunnel-ui/node/*
/seatunnel-ui/node_modules
/seatunnel-ui/node_modules/*
+/seatunnel-web-it/profile/*
diff --git a/pom.xml b/pom.xml
index 52008742..880c3162 100644
--- a/pom.xml
+++ b/pom.xml
@@ -36,6 +36,7 @@
<module>seatunnel-server</module>
<module>seatunnel-datasource</module>
<module>seatunnel-web-dist</module>
+ <module>seatunnel-web-it</module>
</modules>
<properties>
@@ -68,7 +69,7 @@
<junit.version>5.9.0</junit.version>
<commons-collections4.version>4.4</commons-collections4.version>
<commons-lang3.version>3.4</commons-lang3.version>
- <guava.version>19.0</guava.version>
+ <guava.version>33.2.1-jre</guava.version>
<checker.qual.version>3.10.0</checker.qual.version>
<awaitility.version>4.2.0</awaitility.version>
<seatunnel-framework.version>2.3.6</seatunnel-framework.version>
diff --git
a/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/AbstractDataSourceClient.java
b/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/AbstractDataSourceClient.java
index 34fb2850..53ec0f06 100644
---
a/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/AbstractDataSourceClient.java
+++
b/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/AbstractDataSourceClient.java
@@ -221,7 +221,10 @@ public abstract class AbstractDataSourceClient implements
DataSourceService {
}
private ClassLoader getCustomClassloader(String pluginName) {
- String getenv = System.getenv(ST_WEB_BASEDIR_PATH);
+ String getenv =
+ System.getenv(ST_WEB_BASEDIR_PATH) == null
+ ? System.getProperty(ST_WEB_BASEDIR_PATH)
+ : System.getenv(ST_WEB_BASEDIR_PATH);
log.info("ST_WEB_BASEDIR_PATH is : " + getenv);
String libPath = StringUtils.isEmpty(getenv) ? "/datasource" : (getenv
+ "/datasource");
diff --git a/seatunnel-web-it/README.md b/seatunnel-web-it/README.md
new file mode 100644
index 00000000..9e6028f1
--- /dev/null
+++ b/seatunnel-web-it/README.md
@@ -0,0 +1,13 @@
+Build seatunnel-web
+./mvnw clean install -DskipTests
+
+Update mysql database details in src/test/resources/application.yml and Run
the seatunnel-web-it integration tests
+./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false
-DSEATUNNEL_HOME=/some/path/apache-seatunnel-2.3.6
-DST_WEB_BASEDIR_PATH=seatunnel-web-dist/target/apache-seatunnel-web-1.0.0-SNAPSHOT/apache-seatunnel-web-1.0.0-SNAPSHOT
+NOTE: Please remember to update the versions according to the latest supported
versions.
+
+If you're using a version of Java higher than Java 8 for running the tests,
add the following VM options:
+-DitJvmArgs="--add-opens java.base/java.lang.invoke=ALL-UNNAMED".
+
+While running integrations tests from IDE, ensure following VM options are set
+SEATUNNEL_HOME=/some/path/apache-seatunnel-2.3.6
+ST_WEB_BASEDIR_PATH=/some/path/seatunnel-web-dist/target/apache-seatunnel-web-1.0.0-SNAPSHOT/apache-seatunnel-web-1.0.0-SNAPSHOT
diff --git a/seatunnel-web-it/pom.xml b/seatunnel-web-it/pom.xml
new file mode 100644
index 00000000..328dffdd
--- /dev/null
+++ b/seatunnel-web-it/pom.xml
@@ -0,0 +1,99 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-web</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>seatunnel-web-it</artifactId>
+ <properties>
+ <skipIT>true</skipIT>
+ <itJvmArgs>-Xmx1024m</itJvmArgs>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-engine-server</artifactId>
+ <version>${seatunnel-framework.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-server</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-app</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <version>${mysql-connector.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
+ <version>${seatunnel-framework.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-reload4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <skip>${skipIT}</skip>
+ <testFailureIgnore>true</testFailureIgnore>
+ <forkCount>1</forkCount>
+ <reuseForks>false</reuseForks>
+ <argLine>${itJvmArgs}</argLine>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/common/SeaTunnelWebCluster.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/common/SeaTunnelWebCluster.java
new file mode 100644
index 00000000..a5b630a9
--- /dev/null
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/common/SeaTunnelWebCluster.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.common;
+
+import org.apache.seatunnel.app.SeatunnelApplication;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.context.ConfigurableApplicationContext;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import com.hazelcast.logging.ILogger;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Slf4j
+public class SeaTunnelWebCluster {
+ private SeaTunnelServer server;
+ private HazelcastInstanceImpl instance;
+ private ConfigurableApplicationContext applicationContext;
+
+ public void start() {
+ String seatunnelHome = System.getProperty("SEATUNNEL_HOME");
+ if (seatunnelHome == null) {
+ throw new RuntimeException(
+ "SEATUNNEL_HOME is not set. Please set it before running
the tests.");
+ }
+ if (!new File(seatunnelHome).exists()) {
+ throw new RuntimeException(
+ seatunnelHome
+ + " does not exist. Please make sure it exists
before running the tests");
+ }
+ Config hazelcastConfig = Config.loadDefault();
+ SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
+ seaTunnelConfig.setHazelcastConfig(hazelcastConfig);
+ instance =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+ server =
instance.node.nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
+ ILogger LOGGER =
instance.node.nodeEngine.getLogger(SeaTunnelWebCluster.class);
+
+ // String[] args = {"--spring.profiles.active=h2"};
+ String[] args = {};
+ applicationContext = SpringApplication.run(SeatunnelApplication.class,
args);
+ LOGGER.info("SeaTunnel-web server started.");
+ assertTrue(isRunning());
+ }
+
+ public boolean isRunning() {
+ return server.isMasterNode();
+ }
+
+ public void stop() {
+ try {
+ if (applicationContext != null) {
+ int exit = SpringApplication.exit(applicationContext);
+ log.info("Sea tunnel application exited with code: {}", exit);
+ }
+ } catch (Throwable throwable) {
+ log.error("Error stopping application context", throwable);
+ }
+
+ try {
+ if (server != null) {
+ server.shutdown(true);
+ }
+
+ if (instance != null) {
+ instance.shutdown();
+ }
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getMessage(e));
+ }
+ }
+}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/common/SeatunnelWebTestingBase.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/common/SeatunnelWebTestingBase.java
new file mode 100644
index 00000000..19bd0746
--- /dev/null
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/common/SeatunnelWebTestingBase.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.common;
+
+import org.apache.seatunnel.app.domain.request.user.UserLoginReq;
+import org.apache.seatunnel.app.domain.response.user.UserSimpleInfoRes;
+import org.apache.seatunnel.app.utils.JSONTestUtils;
+import org.apache.seatunnel.app.utils.JSONUtils;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+public class SeatunnelWebTestingBase {
+ protected final String baseUrl = "http://localhost:8802/seatunnel/api/v1";
+
+ protected Result<UserSimpleInfoRes> login(UserLoginReq userLoginReq) {
+ String requestBody = JSONUtils.toPrettyJsonString(userLoginReq);
+ String response = sendRequest(url("user/login"), requestBody, "POST");
+ return JSONTestUtils.parseObject(
+ response, new TypeReference<Result<UserSimpleInfoRes>>() {});
+ }
+
+ protected String url(String path) {
+ return String.format("%s/%s?", baseUrl, path);
+ }
+
+ protected String urlWithParam(String pathAndParam) {
+ return String.format("%s/%s", baseUrl, pathAndParam);
+ }
+
+ protected String sendRequest(String url) {
+ return sendRequest(url, null, "GET");
+ }
+
+ protected String sendRequest(String url, String requestBody, String
httpMethod) {
+ HttpURLConnection connection = null;
+ try {
+ URL urlObject = new URL(url);
+ connection = (HttpURLConnection) urlObject.openConnection();
+ if ("PATCH".equalsIgnoreCase(httpMethod)) {
+ setRequestMethodUsingReflection(connection, "PATCH");
+ } else {
+ connection.setRequestMethod(httpMethod);
+ }
+
+ connection.setRequestProperty("Content-Type", "application/json");
+ if (!url.endsWith("user/login?")) {
+ connection.setRequestProperty("token",
TokenProvider.getToken());
+ }
+ connection.setDoOutput(true);
+ if (requestBody != null) {
+ try (OutputStream os = connection.getOutputStream()) {
+ byte[] input = requestBody.getBytes("utf-8");
+ os.write(input, 0, input.length);
+ }
+ }
+ int responseCode = connection.getResponseCode();
+ if (responseCode == HttpURLConnection.HTTP_OK) {
+ return readResponse(connection);
+ } else {
+ String message = "API Request failed with status code: " +
responseCode;
+ throw new RuntimeException(message);
+ }
+
+ } catch (Exception e) {
+ throw new RuntimeException(e.getMessage(), e);
+ } finally {
+ if (connection != null) {
+ connection.disconnect();
+ }
+ }
+ }
+
+ private static void setRequestMethodUsingReflection(
+ HttpURLConnection httpURLConnection, String method) throws
Exception {
+ try {
+ Field methodField =
HttpURLConnection.class.getDeclaredField("method");
+ methodField.setAccessible(true);
+ methodField.set(httpURLConnection, method);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ throw new Exception("Failed to set HTTP method to PATCH", e);
+ }
+ }
+
+ private String readResponse(HttpURLConnection connection) throws
IOException {
+ BufferedReader rd = new BufferedReader(new
InputStreamReader(connection.getInputStream()));
+ String line;
+ StringBuilder result = new StringBuilder();
+ while ((line = rd.readLine()) != null) {
+ result.append(line);
+ }
+ rd.close();
+ return result.toString();
+ }
+}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/common/TokenProvider.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/common/TokenProvider.java
new file mode 100644
index 00000000..7f58495a
--- /dev/null
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/common/TokenProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.common;
+
+import org.apache.seatunnel.app.domain.request.user.UserLoginReq;
+import org.apache.seatunnel.app.domain.response.user.UserSimpleInfoRes;
+
+public class TokenProvider {
+ private static String token;
+
+ public static String getToken() {
+ if (token == null) {
+ initToken();
+ }
+ return token;
+ }
+
+ private static void initToken() {
+ SeatunnelWebTestingBase seatunnelWebTestingBase = new
SeatunnelWebTestingBase();
+ UserLoginReq userLoginReq = new UserLoginReq();
+ userLoginReq.setUsername("admin");
+ userLoginReq.setPassword("admin");
+ Result<UserSimpleInfoRes> loginResponse =
seatunnelWebTestingBase.login(userLoginReq);
+ TokenProvider.token = loginResponse.getData().getToken();
+ }
+}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/ConnectorControllerWrapper.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/ConnectorControllerWrapper.java
new file mode 100644
index 00000000..8a431feb
--- /dev/null
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/ConnectorControllerWrapper.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.controller;
+
+import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.common.SeatunnelWebTestingBase;
+import org.apache.seatunnel.app.domain.response.connector.ConnectorInfo;
+import org.apache.seatunnel.app.utils.JSONTestUtils;
+import org.apache.seatunnel.app.utils.JSONUtils;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+import java.util.List;
+
+public class ConnectorControllerWrapper extends SeatunnelWebTestingBase {
+
+ public List<ConnectorInfo> listAllTransform() {
+ String response = sendRequest(url("connector/transforms"));
+ JsonNode data = JSONUtils.parseObject(response).findValue("data");
+ return JSONTestUtils.toList(data.toString(), ConnectorInfo.class);
+ }
+
+ public List<ConnectorInfo> listSource(String status) {
+ String response = sendRequest(urlWithParam("connector/sources?status="
+ status));
+ JsonNode data = JSONUtils.parseObject(response).findValue("data");
+ return JSONTestUtils.toList(data.toString(), ConnectorInfo.class);
+ }
+
+ public List<ConnectorInfo> listSink(String status) {
+ String response = sendRequest(urlWithParam("connector/sinks?status=" +
status));
+ JsonNode data = JSONUtils.parseObject(response).findValue("data");
+ return JSONTestUtils.toList(data.toString(), ConnectorInfo.class);
+ }
+
+ public Result<Void> sync() {
+ String response = sendRequest(url("connector/sync"));
+ return JSONTestUtils.parseObject(response, Result.class);
+ }
+
+ public Result<Void> getConnectorFormStructure(String connectorType, String
connectorName) {
+ String response =
+ sendRequest(
+ urlWithParam(
+ "connector/form?connectorType="
+ + connectorType
+ + "&connectorName="
+ + connectorName));
+ return JSONTestUtils.parseObject(response, Result.class);
+ }
+}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobConfigControllerWrapper.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobConfigControllerWrapper.java
new file mode 100644
index 00000000..55c5627c
--- /dev/null
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobConfigControllerWrapper.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.controller;
+
+import org.apache.seatunnel.app.common.EngineType;
+import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.common.SeatunnelWebTestingBase;
+import org.apache.seatunnel.app.domain.request.job.JobConfig;
+import org.apache.seatunnel.app.domain.response.job.JobConfigRes;
+import org.apache.seatunnel.app.utils.JSONTestUtils;
+import org.apache.seatunnel.app.utils.JSONUtils;
+
+import org.apache.commons.collections.map.HashedMap;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+import java.util.Map;
+
+public class JobConfigControllerWrapper extends SeatunnelWebTestingBase {
+
+ public Result<Void> updateJobConfig(long jobVersionId, JobConfig
jobConfig) {
+ String requestBody = JSONUtils.toPrettyJsonString(jobConfig);
+ String response = sendRequest(url("job/config/" + jobVersionId),
requestBody, "PUT");
+ return JSONTestUtils.parseObject(response, Result.class);
+ }
+
+ public Result<JobConfigRes> getJobConfig(long jobVersionId) {
+ String response = sendRequest(url("job/config/" + jobVersionId));
+ return JSONTestUtils.parseObject(response, new
TypeReference<Result<JobConfigRes>>() {});
+ }
+
+ public JobConfig populateJobConfigObject(String jobName) {
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName(jobName);
+ jobConfig.setDescription(jobName + " description from config");
+ jobConfig.setEngine(EngineType.SeaTunnel);
+ Map<String, Object> env = new HashedMap();
+ env.put("job.mode", "BATCH");
+ env.put("job.name", "SeaTunnel_Job");
+ env.put("jars", "");
+ env.put("checkpoint.interval", "30");
+ env.put("checkpoint.timeout", "");
+ env.put("read_limit.rows_per_second", "");
+ env.put("read_limit.bytes_per_second", "");
+ env.put("custom_parameters", "");
+ jobConfig.setEnv(env);
+ return jobConfig;
+ }
+}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobDefinitionControllerWrapper.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobDefinitionControllerWrapper.java
new file mode 100644
index 00000000..de10511f
--- /dev/null
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobDefinitionControllerWrapper.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.controller;
+
+import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.common.SeatunnelWebTestingBase;
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.JobReq;
+import org.apache.seatunnel.app.domain.response.PageInfo;
+import org.apache.seatunnel.app.domain.response.job.JobDefinitionRes;
+import org.apache.seatunnel.app.utils.JSONTestUtils;
+import org.apache.seatunnel.app.utils.JSONUtils;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class JobDefinitionControllerWrapper extends SeatunnelWebTestingBase {
+
+ public Result<Long> createJobDefinition(JobReq jobReq) {
+ String requestBody = JSONUtils.toPrettyJsonString(jobReq);
+ String response = sendRequest(url("job/definition"), requestBody,
"POST");
+ return JSONTestUtils.parseObject(response, new
TypeReference<Result<Long>>() {});
+ }
+
+ public Long createJobDefinition(String jobName) {
+ JobReq jobReq = new JobReq();
+ jobReq.setName(jobName);
+ jobReq.setDescription(jobName + " description");
+ jobReq.setJobType(BusinessMode.DATA_INTEGRATION);
+ Result<Long> result = createJobDefinition(jobReq);
+ assertTrue(result.isSuccess());
+ return result.getData();
+ }
+
+ public Result<PageInfo<JobDefinitionRes>> getJobDefinition(
+ String searchName, Integer pageNo, Integer pageSize) {
+ String response =
+ sendRequest(
+ urlWithParam("job/definition?")
+ + "searchName="
+ + searchName
+ + "&pageNo="
+ + pageNo
+ + "&pageSize="
+ + pageSize);
+ return JSONTestUtils.parseObject(
+ response, new
TypeReference<Result<PageInfo<JobDefinitionRes>>>() {});
+ }
+
+ public Result<JobDefinitionRes> getJobDefinitionById(long jobId) {
+ String response = sendRequest(url("job/definition/" + jobId));
+ return JSONTestUtils.parseObject(
+ response, new TypeReference<Result<JobDefinitionRes>>() {});
+ }
+
+ public Result<Void> deleteJobDefinition(long id) {
+ String response = sendRequest(urlWithParam("job/definition?id=" + id),
null, "DELETE");
+ return JSONTestUtils.parseObject(response, Result.class);
+ }
+}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java
new file mode 100644
index 00000000..760d0831
--- /dev/null
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.controller;
+
+import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.common.SeatunnelWebTestingBase;
+import org.apache.seatunnel.app.utils.JSONTestUtils;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+public class JobExecutorControllerWrapper extends SeatunnelWebTestingBase {
+
+ public Result<Long> jobExecutor(Long jobDefineId) {
+ String response =
+ sendRequest(urlWithParam("job/executor/execute?jobDefineId=" +
jobDefineId));
+ return JSONTestUtils.parseObject(response, new
TypeReference<Result<Long>>() {});
+ }
+
+ public Result<Void> resource(Long jobDefineId) {
+ String response =
+ sendRequest(urlWithParam("job/executor/resource?jobDefineId="
+ jobDefineId));
+ return JSONTestUtils.parseObject(response, Result.class);
+ }
+
+ public Result<Void> jobPause(Long jobInstanceId) {
+ String response =
+ sendRequest(urlWithParam("job/executor/pause?jobInstanceId=" +
jobInstanceId));
+ return JSONTestUtils.parseObject(response, Result.class);
+ }
+
+ public Result<Void> jobRestore(Long jobInstanceId) {
+ String response =
+ sendRequest(urlWithParam("job/executor/restore?jobInstanceId="
+ jobInstanceId));
+ return JSONTestUtils.parseObject(response, Result.class);
+ }
+}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobMetricsControllerWrapper.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobMetricsControllerWrapper.java
new file mode 100644
index 00000000..0204eded
--- /dev/null
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobMetricsControllerWrapper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.controller;
+
+import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.common.SeatunnelWebTestingBase;
+import org.apache.seatunnel.app.domain.response.metrics.JobDAG;
+import
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
+import org.apache.seatunnel.app.utils.JSONTestUtils;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+import java.util.List;
+
+public class JobMetricsControllerWrapper extends SeatunnelWebTestingBase {
+
+ public Result<List<JobPipelineDetailMetricsRes>> detail(Long
jobInstanceId) {
+ String response =
+ sendRequest(urlWithParam("job/metrics/detail?jobInstanceId=" +
jobInstanceId));
+ return JSONTestUtils.parseObject(
+ response, new
TypeReference<Result<List<JobPipelineDetailMetricsRes>>>() {});
+ }
+
+ public Result<JobDAG> getJobDAG(Long jobInstanceId) {
+ String response =
+ sendRequest(urlWithParam("job/metrics/dag?jobInstanceId=" +
jobInstanceId));
+ return JSONTestUtils.parseObject(response, new
TypeReference<Result<JobDAG>>() {});
+ }
+
+ public Result<Void> summary(Long jobInstanceId) {
+ String response =
+ sendRequest(urlWithParam("job/metrics/summary?jobInstanceId="
+ jobInstanceId));
+ return JSONTestUtils.parseObject(response, Result.class);
+ }
+}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobTaskControllerWrapper.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobTaskControllerWrapper.java
new file mode 100644
index 00000000..a89581ca
--- /dev/null
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobTaskControllerWrapper.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.controller;
+
+import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.common.SeatunnelWebTestingBase;
+import org.apache.seatunnel.app.domain.request.connector.SceneMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.DatabaseTableSchemaReq;
+import org.apache.seatunnel.app.domain.request.job.JobDAG;
+import org.apache.seatunnel.app.domain.request.job.JobTaskInfo;
+import org.apache.seatunnel.app.domain.request.job.PluginConfig;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.job.JobTaskCheckRes;
+import org.apache.seatunnel.app.utils.JSONTestUtils;
+import org.apache.seatunnel.app.utils.JSONUtils;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.datasource.plugin.api.model.TableField;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class JobTaskControllerWrapper extends SeatunnelWebTestingBase {
+
+ public Result<JobTaskCheckRes> saveJobDAG(long jobVersionId, JobDAG
jobDAG) {
+ String requestBody = JSONUtils.toPrettyJsonString(jobDAG);
+ String response = sendRequest(url("job/dag/" + jobVersionId),
requestBody, "POST");
+ return JSONTestUtils.parseObject(response, new
TypeReference<Result<JobTaskCheckRes>>() {});
+ }
+
+ public Result<JobTaskInfo> getJob(long jobVersionId) {
+ String response = sendRequest(url("job/" + jobVersionId));
+ return JSONTestUtils.parseObject(response, new
TypeReference<Result<JobTaskInfo>>() {});
+ }
+
+ public Result<Void> saveSingleTask(long jobVersionId, PluginConfig
pluginConfig) {
+ String requestBody = JSONUtils.toPrettyJsonString(pluginConfig);
+ String response = sendRequest(url("job/task/" + jobVersionId),
requestBody, "POST");
+ return JSONTestUtils.parseObject(response, Result.class);
+ }
+
+ public Result<PluginConfig> getSingleTask(long jobVersionId, String
pluginId) {
+ String response = sendRequest(url("job/task/" + jobVersionId) +
"pluginId=" + pluginId);
+ return JSONTestUtils.parseObject(response, new
TypeReference<Result<PluginConfig>>() {});
+ }
+
+ public Result<Void> deleteSingleTask(long jobVersionId, String pluginId) {
+ String response =
+ sendRequest(
+ url("job/task/" + jobVersionId) + "pluginId=" +
pluginId, null, "DELETE");
+ return JSONTestUtils.parseObject(response, Result.class);
+ }
+
+ public String createFakeSourcePlugin(String datasourceId, long
jobVersionId) {
+ DataSourceOption tableOption = new DataSourceOption();
+ tableOption.setDatabases(Arrays.asList("fake_database"));
+ tableOption.setTables(Arrays.asList("fake_table"));
+ String sourcePluginId = "src_" + System.currentTimeMillis();
+ PluginConfig sourcePluginConfig =
+ PluginConfig.builder()
+ .pluginId(sourcePluginId)
+ .name("source-fakesource")
+ .type(PluginType.SOURCE)
+ .tableOption(tableOption)
+ .selectTableFields(getSelectTableFields())
+ .transformOptions(null)
+ .outputSchema(getOutputSchema())
+ .dataSourceId(Long.parseLong(datasourceId))
+ .sceneMode(SceneMode.SINGLE_TABLE)
+ .config(
+
"{\"query\":\"\",\"tables_configs\":\"\",\"schema\":\"fields {\\n name =
\\\"string\\\"\\n age = \\\"int\\\"\\n
}\",\"string.fake.mode\":\"RANGE\",\"string.template\":\"\",\"tinyint.fake.mode\":\"RANGE\",\"tinyint.template\":\"\",\"smallint.fake.mode\":\"RANGE\",\"smallint.template\":\"\",\"int.fake.mode\":\"RANGE\",\"int.template\":\"\",\"bigint.fake.mode\":\"RANGE\",\"bigint.template\":\"\",\"float.fake.mode\":\"RANGE\",\"float.templat
[...]
+ .build();
+
+ Result<Void> srcResult = saveSingleTask(jobVersionId,
sourcePluginConfig);
+ assertTrue(srcResult.isSuccess());
+ return sourcePluginId;
+ }
+
+ public String createConsoleSinkPlugin(String datasourceId, long
jobVersionId) {
+ DataSourceOption sinkTableOption = new DataSourceOption();
+ sinkTableOption.setDatabases(Arrays.asList("console_fake_database"));
+ sinkTableOption.setTables(Arrays.asList("console_fake_table"));
+
+ String sinkPluginId = "sink_" + System.currentTimeMillis();
+ PluginConfig sinkPluginConfig =
+ PluginConfig.builder()
+ .pluginId(sinkPluginId)
+ .name("sink-console")
+ .type(PluginType.SINK)
+ .tableOption(sinkTableOption)
+ .selectTableFields(getSelectTableFields())
+ .transformOptions(null)
+ .outputSchema(null)
+ .dataSourceId(Long.parseLong(datasourceId))
+ .sceneMode(SceneMode.SINGLE_TABLE)
+ .config("{\"query\":\"\"}")
+ .build();
+
+ Result<Void> sinkResult = saveSingleTask(jobVersionId,
sinkPluginConfig);
+ assertTrue(sinkResult.isSuccess());
+ return sinkPluginId;
+ }
+
+ public String createReplaceTransformPlugin(long jobVersionId) {
+ String transPluginId = "trans_" + System.currentTimeMillis();
+ PluginConfig transformPluginConfig =
+ PluginConfig.builder()
+ .pluginId(transPluginId)
+ .name("transform-replace")
+ .type(PluginType.TRANSFORM)
+ .connectorType("Replace")
+ .transformOptions(null)
+ .outputSchema(null)
+ .sceneMode(SceneMode.SINGLE_TABLE)
+ .config(
+
"{\"query\":\"\",\"replace_field\":\"name\",\"pattern\":\"OK\",\"replacement\":\"ITS
OK.\",\"is_regex\":\"false\",\"replace_first\":null}")
+ .build();
+ Result<Void> transResult = saveSingleTask(jobVersionId,
transformPluginConfig);
+ assertTrue(transResult.isSuccess());
+ return transPluginId;
+ }
+
+ private List<DatabaseTableSchemaReq> getOutputSchema() {
+ DatabaseTableSchemaReq databaseTableSchemaReq = new
DatabaseTableSchemaReq();
+ databaseTableSchemaReq.setDatabase("fake_database");
+ databaseTableSchemaReq.setTableName("fake_table");
+ databaseTableSchemaReq.setFields(createFields());
+ return Arrays.asList(databaseTableSchemaReq);
+ }
+
+ private List<TableField> createFields() {
+ List<TableField> fields = new ArrayList<>();
+ fields.add(
+ createTableField("string", "name", "", true, null, false,
null, false, "STRING"));
+ fields.add(createTableField("int", "age", "", false, null, false,
null, false, "INT"));
+ return fields;
+ }
+
+ private TableField createTableField(
+ String type,
+ String name,
+ String comment,
+ Boolean primaryKey,
+ String defaultValue,
+ Boolean nullable,
+ Map<String, String> properties,
+ Boolean unSupport,
+ String outputDataType) {
+ TableField field = new TableField();
+ field.setType(type);
+ field.setName(name);
+ field.setComment(comment);
+ field.setPrimaryKey(primaryKey);
+ field.setDefaultValue(defaultValue);
+ field.setNullable(nullable);
+ field.setProperties(properties);
+ field.setUnSupport(unSupport);
+ field.setOutputDataType(outputDataType);
+ return field;
+ }
+
+ private SelectTableFields getSelectTableFields() {
+ SelectTableFields selectTableFields = new SelectTableFields();
+ selectTableFields.setAll(true);
+ List<String> tableFields = Arrays.asList("name", "age");
+ selectTableFields.setTableFields(tableFields);
+ return selectTableFields;
+ }
+}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceControllerWrapper.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceControllerWrapper.java
new file mode 100644
index 00000000..effca68d
--- /dev/null
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceControllerWrapper.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.controller;
+
+import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.common.SeatunnelWebTestingBase;
+import org.apache.seatunnel.app.domain.request.datasource.DatasourceCheckReq;
+import org.apache.seatunnel.app.domain.request.datasource.DatasourceReq;
+import org.apache.seatunnel.app.domain.response.PageInfo;
+import org.apache.seatunnel.app.domain.response.datasource.DatasourceDetailRes;
+import org.apache.seatunnel.app.domain.response.datasource.DatasourceRes;
+import org.apache.seatunnel.app.utils.JSONTestUtils;
+import org.apache.seatunnel.app.utils.JSONUtils;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class SeatunnelDatasourceControllerWrapper extends
SeatunnelWebTestingBase {
+
+ public String createFakeSourceDatasource(String datasourceName) {
+ DatasourceReq req = getFakeSourceDatasourceReq(datasourceName);
+ Result<String> result = createDatasource(req);
+ assertTrue(result.isSuccess());
+ return result.getData();
+ }
+
+ public String createConsoleDatasource(String datasourceName) {
+ DatasourceReq req = getConsoleDatasourceReq(datasourceName);
+ Result<String> result = createDatasource(req);
+ assertTrue(result.isSuccess());
+ return result.getData();
+ }
+
+ public DatasourceReq getFakeSourceDatasourceReq(String datasourceName) {
+ DatasourceReq req = new DatasourceReq();
+ req.setDatasourceName(datasourceName);
+ req.setPluginName("FakeSource");
+ req.setDescription(datasourceName + " desc");
+ req.setDatasourceConfig(
+ "{\"fields\":\"{\\n \\\"name\\\": \\\"string\\\",\\n
\\\"age\\\": \\\"int\\\"\\n }\"}");
+ return req;
+ }
+
+ private DatasourceReq getConsoleDatasourceReq(String datasourceName) {
+ DatasourceReq req = new DatasourceReq();
+ req.setDatasourceName(datasourceName);
+ req.setPluginName("Console");
+ req.setDescription(datasourceName + " description");
+
req.setDatasourceConfig("{\"log.print.data\":\"true\",\"log.print.delay.ms\":\"100\"}");
+ return req;
+ }
+
+ public Result<String> createDatasource(DatasourceReq req) {
+ String requestBody = JSONUtils.toPrettyJsonString(req);
+ String response = sendRequest(url("datasource/create"), requestBody,
"POST");
+ return JSONTestUtils.parseObject(response, new
TypeReference<Result<String>>() {});
+ }
+
+ public Result<Boolean> testConnect(DatasourceCheckReq req) {
+ String requestBody = JSONUtils.toPrettyJsonString(req);
+ String response = sendRequest(url("datasource/check/connect"),
requestBody, "POST");
+ return JSONTestUtils.parseObject(response, new
TypeReference<Result<Boolean>>() {});
+ }
+
+ public Result<Boolean> updateDatasource(String id, DatasourceReq req) {
+ String requestBody = JSONUtils.toPrettyJsonString(req);
+ String response = sendRequest(url("datasource/" + id), requestBody,
"PUT");
+ return JSONTestUtils.parseObject(response, new
TypeReference<Result<Boolean>>() {});
+ }
+
+ public Result<Boolean> deleteDatasource(String id) {
+ String response = sendRequest(url("datasource/" + id), null, "DELETE");
+ return JSONTestUtils.parseObject(response, new
TypeReference<Result<Boolean>>() {});
+ }
+
+ public Result<DatasourceDetailRes> getDatasource(String id) {
+ String response = sendRequest(url("datasource/" + id));
+ return JSONTestUtils.parseObject(
+ response, new TypeReference<Result<DatasourceDetailRes>>() {});
+ }
+
+ public Result<PageInfo<DatasourceRes>> getDatasourceList(
+ String searchVal, String pluginName, Integer pageNo, Integer
pageSize) {
+ String response =
+ sendRequest(
+ String.format(
+
"%s/datasource/list?searchVal=%s&pluginName=%s&pageNo=%d&pageSize=%d",
+ baseUrl, searchVal, pluginName, pageNo,
pageSize));
+ return JSONTestUtils.parseObject(
+ response, new TypeReference<Result<PageInfo<DatasourceRes>>>()
{});
+ }
+}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/UserControllerWrapper.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/UserControllerWrapper.java
new file mode 100644
index 00000000..af0cb556
--- /dev/null
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/UserControllerWrapper.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.controller;
+
+import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.common.SeatunnelWebTestingBase;
+import org.apache.seatunnel.app.domain.request.user.AddUserReq;
+import org.apache.seatunnel.app.domain.request.user.UpdateUserReq;
+import org.apache.seatunnel.app.domain.response.user.AddUserRes;
+import org.apache.seatunnel.app.utils.JSONTestUtils;
+import org.apache.seatunnel.app.utils.JSONUtils;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+public class UserControllerWrapper extends SeatunnelWebTestingBase {
+
+ public Result<AddUserRes> addUser(AddUserReq addUserReq) {
+ String requestBody = JSONUtils.toPrettyJsonString(addUserReq);
+ String response = sendRequest(url("user"), requestBody, "POST");
+ return JSONTestUtils.parseObject(response, new
TypeReference<Result<AddUserRes>>() {});
+ }
+
+ public Result<Void> updateUser(String userId, UpdateUserReq updateUserReq)
{
+ String requestBody = JSONUtils.toPrettyJsonString(updateUserReq);
+ String response = sendRequest(url("user/" + userId), requestBody,
"PUT");
+ return JSONTestUtils.parseObject(response, Result.class);
+ }
+
+ public Result<Void> deleteUser(String userId) {
+ String response = sendRequest(url("user/" + userId), null, "DELETE");
+ return JSONTestUtils.parseObject(response, Result.class);
+ }
+
+ public Result<Void> listUsers(Integer pageNo, Integer pageSize) {
+ String response =
+ sendRequest(urlWithParam("user?pageNo=" + pageNo +
"&pageSize=" + pageSize));
+ return JSONTestUtils.parseObject(response, Result.class);
+ }
+
+ public Result<Void> logout() {
+ String response = sendRequest(url("user/logout"), null, "PATCH");
+ return JSONTestUtils.parseObject(response, Result.class);
+ }
+}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/domain/ConnectorInfoDeserializer.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/domain/ConnectorInfoDeserializer.java
new file mode 100644
index 00000000..fb164e41
--- /dev/null
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/domain/ConnectorInfoDeserializer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.domain;
+
+import org.apache.seatunnel.app.domain.response.connector.ConnectorInfo;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+
+import java.io.IOException;
+
+public class ConnectorInfoDeserializer extends JsonDeserializer<ConnectorInfo>
{
+
+ @Override
+ public ConnectorInfo deserialize(
+ JsonParser jsonParser, DeserializationContext
deserializationContext)
+ throws IOException, JsonProcessingException {
+ JsonNode node = jsonParser.getCodec().readTree(jsonParser);
+ JsonNode pluginIdentifierNode = node.get("pluginIdentifier");
+ String artifactId = node.get("artifactId").asText();
+
+ PluginIdentifier pluginIdentifier =
+ new PluginIdentifierDeserializer()
+ .deserialize(
+
pluginIdentifierNode.traverse(jsonParser.getCodec()),
+ deserializationContext);
+
+ return new ConnectorInfo(pluginIdentifier, artifactId);
+ }
+}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/domain/PluginIdentifierDeserializer.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/domain/PluginIdentifierDeserializer.java
new file mode 100644
index 00000000..3d097aa4
--- /dev/null
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/domain/PluginIdentifierDeserializer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.domain;
+
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+
+import java.io.IOException;
+
+public class PluginIdentifierDeserializer extends
JsonDeserializer<PluginIdentifier> {
+
+ @Override
+ public PluginIdentifier deserialize(
+ JsonParser jsonParser, DeserializationContext
deserializationContext)
+ throws IOException, JsonProcessingException {
+ JsonNode node = jsonParser.getCodec().readTree(jsonParser);
+ String engineType = node.get("engineType").asText();
+ String pluginType = node.get("pluginType").asText();
+ String pluginName = node.get("pluginName").asText();
+
+ return PluginIdentifier.of(engineType, pluginType, pluginName);
+ }
+}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/ConnectorControllerTest.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/ConnectorControllerTest.java
new file mode 100644
index 00000000..222f4863
--- /dev/null
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/ConnectorControllerTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.test;
+
+import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.common.SeaTunnelWebCluster;
+import org.apache.seatunnel.app.controller.ConnectorControllerWrapper;
+import org.apache.seatunnel.app.domain.response.connector.ConnectorInfo;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConnectorControllerTest {
+ private static final SeaTunnelWebCluster seaTunnelWebCluster = new
SeaTunnelWebCluster();
+ private static ConnectorControllerWrapper connectorControllerWrapper;
+
+ @BeforeAll
+ public static void setUp() {
+ seaTunnelWebCluster.start();
+ connectorControllerWrapper = new ConnectorControllerWrapper();
+ }
+
+ @Test
+ public void testListAllTransform() {
+ List<ConnectorInfo> listResult =
connectorControllerWrapper.listAllTransform();
+ assertFalse(listResult.isEmpty());
+ }
+
+ @Test
+ public void testListSource() {
+ List<ConnectorInfo> result =
connectorControllerWrapper.listSource("ALL");
+ assertFalse(result.isEmpty());
+ }
+
+ @Test
+ public void testListSink() {
+ List<ConnectorInfo> result =
connectorControllerWrapper.listSink("ALL");
+ assertFalse(result.isEmpty());
+ }
+
+ @Test
+ void testSync() {
+ Result<Void> sync = connectorControllerWrapper.sync();
+ assertTrue(sync.isSuccess());
+ }
+
+ @Test
+ void testGetConnectorFormStructure() {
+ Result<Void> connectorFormStructure =
+ connectorControllerWrapper.getConnectorFormStructure("source",
"FakeSource");
+ assertTrue(connectorFormStructure.isSuccess());
+ }
+
+ @AfterAll
+ public static void tearDown() {
+ seaTunnelWebCluster.stop();
+ }
+}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobConfigControllerTest.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobConfigControllerTest.java
new file mode 100644
index 00000000..215d7a1d
--- /dev/null
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobConfigControllerTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.test;
+
+import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.common.SeaTunnelWebCluster;
+import org.apache.seatunnel.app.controller.JobConfigControllerWrapper;
+import org.apache.seatunnel.app.controller.JobDefinitionControllerWrapper;
+import org.apache.seatunnel.app.domain.request.job.JobConfig;
+import org.apache.seatunnel.app.domain.response.job.JobConfigRes;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class JobConfigControllerTest {
+ private static final SeaTunnelWebCluster seaTunnelWebCluster = new
SeaTunnelWebCluster();
+ private static JobConfigControllerWrapper jobConfigControllerWrapper;
+ private static JobDefinitionControllerWrapper
jobDefinitionControllerWrapper;
+ private static String uniqueId = "_" + System.currentTimeMillis();
+
+ @BeforeAll
+ public static void setUp() {
+ seaTunnelWebCluster.start();
+ jobConfigControllerWrapper = new JobConfigControllerWrapper();
+ jobDefinitionControllerWrapper = new JobDefinitionControllerWrapper();
+ }
+
+ @Test
+ public void updateJobConfig_shouldReturnSuccess_whenValidRequest() {
+ String jobName = "config_job1" + uniqueId;
+ updateConfig(jobName);
+ }
+
+ private static void updateConfig(String jobName) {
+ long jobId =
jobDefinitionControllerWrapper.createJobDefinition(jobName);
+ JobConfig jobConfig =
jobConfigControllerWrapper.populateJobConfigObject(jobName);
+ Result<Void> result =
jobConfigControllerWrapper.updateJobConfig(jobId, jobConfig);
+ assertTrue(result.isSuccess());
+ }
+
+ @Test
+ public void getJobConfig_shouldReturnData_whenValidRequest() {
+ String jobName = "config_job2" + uniqueId;
+ long jobId =
jobDefinitionControllerWrapper.createJobDefinition(jobName);
+ Result<JobConfigRes> result =
jobConfigControllerWrapper.getJobConfig(jobId);
+ assertTrue(result.isSuccess());
+ assertNotNull(result.getData());
+ assertEquals(jobName, result.getData().getName());
+ }
+
+ @AfterAll
+ public static void tearDown() {
+ seaTunnelWebCluster.stop();
+ }
+}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobDefinitionControllerTest.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobDefinitionControllerTest.java
new file mode 100644
index 00000000..73404185
--- /dev/null
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobDefinitionControllerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.test;
+
+import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.common.SeaTunnelWebCluster;
+import org.apache.seatunnel.app.controller.JobDefinitionControllerWrapper;
+import org.apache.seatunnel.app.domain.response.PageInfo;
+import org.apache.seatunnel.app.domain.response.job.JobDefinitionRes;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class JobDefinitionControllerTest {
+ private static final SeaTunnelWebCluster seaTunnelWebCluster = new
SeaTunnelWebCluster();
+ private static JobDefinitionControllerWrapper
jobDefinitionControllerWrapper;
+ private static String uniqueId = "_" + System.currentTimeMillis();
+
+ @BeforeAll
+ public static void setUp() {
+ seaTunnelWebCluster.start();
+ jobDefinitionControllerWrapper = new JobDefinitionControllerWrapper();
+ }
+
+ @Test
+ public void createJobDefinition_shouldReturnSuccess_whenValidRequest() {
+ long jobId = jobDefinitionControllerWrapper.createJobDefinition("job1"
+ uniqueId);
+ assertTrue(jobId > 0);
+ }
+
+ @Test
+ public void getJobDefinitionById_shouldReturnData_whenValidRequest() {
+ String job2 = "job2" + uniqueId;
+ long jobId = jobDefinitionControllerWrapper.createJobDefinition(job2);
+ Result<JobDefinitionRes> result =
+ jobDefinitionControllerWrapper.getJobDefinitionById(jobId);
+ assertTrue(result.isSuccess());
+ assertEquals(job2, result.getData().getName());
+ }
+
+ @Test
+ public void getJobDefinition_shouldReturnData_whenValidRequest() {
+ String job3 = "job3" + uniqueId;
+ long jobId = jobDefinitionControllerWrapper.createJobDefinition(job3);
+ Result<PageInfo<JobDefinitionRes>> result =
+ jobDefinitionControllerWrapper.getJobDefinition(job3, 1, 10);
+ assertTrue(result.isSuccess());
+ assertEquals(1, result.getData().getData().size());
+ assertEquals(jobId, result.getData().getData().get(0).getId());
+ }
+
+ @Test
+ public void deleteJobDefinition_shouldReturnSuccess_whenValidId() {
+ String job7 = "job7" + uniqueId;
+ long jobId = jobDefinitionControllerWrapper.createJobDefinition(job7);
+ Result<Void> result =
jobDefinitionControllerWrapper.deleteJobDefinition(jobId);
+ assertTrue(result.isSuccess());
+ }
+
+ @AfterAll
+ public static void tearDown() {
+ seaTunnelWebCluster.stop();
+ }
+}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
new file mode 100644
index 00000000..4a01df28
--- /dev/null
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.test;
+
+import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.common.SeaTunnelWebCluster;
+import org.apache.seatunnel.app.controller.JobExecutorControllerWrapper;
+import
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
+import org.apache.seatunnel.app.utils.JobUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class JobExecutorControllerTest {
+ private static final SeaTunnelWebCluster seaTunnelWebCluster = new
SeaTunnelWebCluster();
+ private static JobExecutorControllerWrapper jobExecutorControllerWrapper;
+ private static final String uniqueId = "_" + System.currentTimeMillis();
+
+ @BeforeAll
+ public static void setUp() {
+ seaTunnelWebCluster.start();
+ jobExecutorControllerWrapper = new JobExecutorControllerWrapper();
+ }
+
+ @Test
+ public void executeJob_shouldReturnSuccess_whenValidRequest() {
+ String jobName = "execJob" + uniqueId;
+ long jobVersionId = JobUtils.createJob(jobName);
+ Result<Long> result =
jobExecutorControllerWrapper.jobExecutor(jobVersionId);
+ assertTrue(result.isSuccess());
+ assertTrue(result.getData() > 0);
+ Result<List<JobPipelineDetailMetricsRes>> listResult =
+ JobUtils.waitForJobCompletion(result.getData());
+ assertEquals(1, listResult.getData().size());
+ assertEquals("FINISHED", listResult.getData().get(0).getStatus());
+ assertEquals(5, listResult.getData().get(0).getReadRowCount());
+ assertEquals(5, listResult.getData().get(0).getWriteRowCount());
+ }
+
+ @Test
+ public void restoreJob_shouldReturnSuccess_whenValidRequest() {
+ String jobName = "jobRestore" + uniqueId;
+ long jobVersionId = JobUtils.createJob(jobName);
+ Result<Long> executorResult =
jobExecutorControllerWrapper.jobExecutor(jobVersionId);
+ assertTrue(executorResult.isSuccess());
+ Result<Void> result =
jobExecutorControllerWrapper.jobRestore(executorResult.getData());
+ assertTrue(result.isSuccess());
+ }
+
+ @AfterAll
+ public static void tearDown() {
+ seaTunnelWebCluster.stop();
+ }
+}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobMetricsControllerTest.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobMetricsControllerTest.java
new file mode 100644
index 00000000..5086ba74
--- /dev/null
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobMetricsControllerTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.test;
+
+import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.common.SeaTunnelWebCluster;
+import org.apache.seatunnel.app.controller.JobExecutorControllerWrapper;
+import org.apache.seatunnel.app.controller.JobMetricsControllerWrapper;
+import org.apache.seatunnel.app.domain.response.metrics.JobDAG;
+import
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
+import org.apache.seatunnel.app.utils.JobUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class JobMetricsControllerTest {
+ private static final SeaTunnelWebCluster seaTunnelWebCluster = new
SeaTunnelWebCluster();
+ private static JobMetricsControllerWrapper jobMetricsControllerWrapper;
+ private static JobExecutorControllerWrapper jobExecutorControllerWrapper;
+ private static final String uniqueId = "_" + System.currentTimeMillis();
+
+ @BeforeAll
+ public static void setUp() {
+ seaTunnelWebCluster.start();
+ jobMetricsControllerWrapper = new JobMetricsControllerWrapper();
+ jobExecutorControllerWrapper = new JobExecutorControllerWrapper();
+ }
+
+ @Test
+ public void detailMetrics_shouldReturnData_whenValidRequest() {
+ String jobName = "jobDetail" + uniqueId;
+ long jobInstanceId = executeJob(jobName);
+ Result<List<JobPipelineDetailMetricsRes>> result =
+ jobMetricsControllerWrapper.detail(jobInstanceId);
+ assertTrue(result.isSuccess());
+ assertFalse(result.getData().isEmpty());
+ }
+
+ private static Long executeJob(String jobName) {
+ Long jobVersionId = JobUtils.createJob(jobName);
+ Result<Long> jobExecutionResult =
jobExecutorControllerWrapper.jobExecutor(jobVersionId);
+ assertTrue(jobExecutionResult.isSuccess());
+ return jobExecutionResult.getData();
+ }
+
+ @Test
+ public void getJobDAG_shouldReturnData_whenValidRequest() {
+ String jobName = "jobDAG" + uniqueId;
+ long jobInstanceId = executeJob(jobName);
+ Result<JobDAG> result =
jobMetricsControllerWrapper.getJobDAG(jobInstanceId);
+ assertTrue(result.isSuccess());
+ assertNotNull(result.getData());
+ }
+
+ @Test
+ public void summaryMetrics_shouldReturnData_whenValidRequest() {
+ String jobName = "jobSummary" + uniqueId;
+ long jobInstanceId = executeJob(jobName);
+ Result<Void> result =
jobMetricsControllerWrapper.summary(jobInstanceId);
+ assertTrue(result.isSuccess());
+ }
+
+ @AfterAll
+ public static void tearDown() {
+ seaTunnelWebCluster.stop();
+ }
+}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobTaskControllerTest.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobTaskControllerTest.java
new file mode 100644
index 00000000..c183f786
--- /dev/null
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobTaskControllerTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.test;
+
+import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.common.SeaTunnelWebCluster;
+import org.apache.seatunnel.app.controller.JobConfigControllerWrapper;
+import org.apache.seatunnel.app.controller.JobDefinitionControllerWrapper;
+import org.apache.seatunnel.app.controller.JobTaskControllerWrapper;
+import
org.apache.seatunnel.app.controller.SeatunnelDatasourceControllerWrapper;
+import org.apache.seatunnel.app.domain.request.job.Edge;
+import org.apache.seatunnel.app.domain.request.job.JobConfig;
+import org.apache.seatunnel.app.domain.request.job.JobDAG;
+import org.apache.seatunnel.app.domain.request.job.JobTaskInfo;
+import org.apache.seatunnel.app.domain.request.job.PluginConfig;
+import org.apache.seatunnel.app.domain.response.job.JobTaskCheckRes;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class JobTaskControllerTest {
+ private static final SeaTunnelWebCluster seaTunnelWebCluster = new
SeaTunnelWebCluster();
+ private static JobTaskControllerWrapper jobTaskControllerWrapper;
+ private static JobDefinitionControllerWrapper
jobDefinitionControllerWrapper;
+ private static SeatunnelDatasourceControllerWrapper
seatunnelDatasourceControllerWrapper;
+ private static JobConfigControllerWrapper jobConfigControllerWrapper;
+ private static String uniqueId = "_" + System.currentTimeMillis();
+
+ @BeforeAll
+ public static void setUp() {
+ seaTunnelWebCluster.start();
+ jobTaskControllerWrapper = new JobTaskControllerWrapper();
+ jobDefinitionControllerWrapper = new JobDefinitionControllerWrapper();
+ seatunnelDatasourceControllerWrapper = new
SeatunnelDatasourceControllerWrapper();
+ jobConfigControllerWrapper = new JobConfigControllerWrapper();
+ }
+
+ @Test
+ public void getJob_shouldReturnData_whenValidRequest() {
+ long jobId =
jobDefinitionControllerWrapper.createJobDefinition("task_job1" + uniqueId);
+ Result<JobTaskInfo> result = jobTaskControllerWrapper.getJob(jobId);
+ assertTrue(result.isSuccess());
+ }
+
+ @Test
+ public void saveSingleTask_shouldReturnSuccess_whenValidRequest() {
+ String jobName = "task_job2" + uniqueId;
+ long jobId =
jobDefinitionControllerWrapper.createJobDefinition(jobName);
+ String sourceDatasourceId =
+
seatunnelDatasourceControllerWrapper.createFakeSourceDatasource(
+ "task_db2_source" + uniqueId);
+ String sinkDataSourceId =
+ seatunnelDatasourceControllerWrapper.createConsoleDatasource(
+ "task_db2_sink" + uniqueId);
+ String sourcePluginId =
+
jobTaskControllerWrapper.createFakeSourcePlugin(sourceDatasourceId, jobId);
+ String sinkPluginId =
+
jobTaskControllerWrapper.createConsoleSinkPlugin(sinkDataSourceId, jobId);
+ String transPluginId =
jobTaskControllerWrapper.createReplaceTransformPlugin(jobId);
+
+ JobConfig jobConfig =
jobConfigControllerWrapper.populateJobConfigObject(jobName);
+ jobConfigControllerWrapper.updateJobConfig(jobId, jobConfig);
+
+ JobDAG jobDAG = new JobDAG();
+ List<Edge> edges = new ArrayList<>();
+ edges.add(new Edge(sourcePluginId, transPluginId));
+ edges.add(new Edge(transPluginId, sinkPluginId));
+ jobDAG.setEdges(edges);
+
+ Result<JobTaskCheckRes> dagResult =
jobTaskControllerWrapper.saveJobDAG(jobId, jobDAG);
+ assertTrue(dagResult.isSuccess());
+ }
+
+ @Test
+ public void getSingleTask_shouldReturnData_whenValidRequest() {
+ String jobName = "task_job3" + uniqueId;
+ long jobId =
jobDefinitionControllerWrapper.createJobDefinition(jobName);
+ String datasourceName = "task_job1_db3" + uniqueId;
+ String datasourceId =
+
seatunnelDatasourceControllerWrapper.createFakeSourceDatasource(datasourceName);
+ String sourcePluginId =
+ jobTaskControllerWrapper.createFakeSourcePlugin(datasourceId,
jobId);
+ Result<PluginConfig> result =
jobTaskControllerWrapper.getSingleTask(jobId, sourcePluginId);
+ assertTrue(result.isSuccess());
+ }
+
+ @Test
+ public void deleteSingleTask_shouldReturnSuccess_whenValidRequest() {
+ String jobName = "task_job7" + uniqueId;
+ long jobId =
jobDefinitionControllerWrapper.createJobDefinition(jobName);
+ String datasourceName = "task_job1_db4" + uniqueId;
+ String datasourceId =
+
seatunnelDatasourceControllerWrapper.createFakeSourceDatasource(datasourceName);
+ String sourcePluginId =
+ jobTaskControllerWrapper.createFakeSourcePlugin(datasourceId,
jobId);
+ Result<Void> result = jobTaskControllerWrapper.deleteSingleTask(jobId,
sourcePluginId);
+ assertTrue(result.isSuccess());
+ }
+
+ @AfterAll
+ public static void tearDown() {
+ seaTunnelWebCluster.stop();
+ }
+}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/SeatunnelDatasourceControllerTest.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/SeatunnelDatasourceControllerTest.java
new file mode 100644
index 00000000..b928e0cc
--- /dev/null
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/SeatunnelDatasourceControllerTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.test;
+
+import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.common.SeaTunnelWebCluster;
+import
org.apache.seatunnel.app.controller.SeatunnelDatasourceControllerWrapper;
+import org.apache.seatunnel.app.domain.request.datasource.DatasourceCheckReq;
+import org.apache.seatunnel.app.domain.request.datasource.DatasourceReq;
+import org.apache.seatunnel.app.domain.response.PageInfo;
+import org.apache.seatunnel.app.domain.response.datasource.DatasourceDetailRes;
+import org.apache.seatunnel.app.domain.response.datasource.DatasourceRes;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class SeatunnelDatasourceControllerTest {
+ private static final SeaTunnelWebCluster seaTunnelWebCluster = new
SeaTunnelWebCluster();
+ private static SeatunnelDatasourceControllerWrapper
seatunnelDatasourceControllerWrapper;
+ private static String uniqueId = "_" + System.currentTimeMillis();
+
+ @BeforeAll
+ public static void setUp() {
+ seaTunnelWebCluster.start();
+ seatunnelDatasourceControllerWrapper = new
SeatunnelDatasourceControllerWrapper();
+ }
+
+ @Test
+ public void createDatasource_shouldReturnSuccess() {
+ String datasourceId =
+
seatunnelDatasourceControllerWrapper.createFakeSourceDatasource("ds1" +
uniqueId);
+ assertTrue(!datasourceId.isEmpty());
+ }
+
+ @Test
+ public void testConnect_shouldReturnSuccess() {
+ DatasourceCheckReq req = new DatasourceCheckReq();
+ req.setPluginName("FakeSource");
+ Map<String, String> datasourceConfig = new HashMap<>();
+ datasourceConfig.put("fields", "{\"name\" : \"string\", \"age\" :
\"int\"}");
+ req.setDatasourceConfig(datasourceConfig);
+ Result<Boolean> result =
seatunnelDatasourceControllerWrapper.testConnect(req);
+ assertTrue(result.isSuccess());
+ }
+
+ @Test
+ public void updateDatasource_shouldReturnSuccess() {
+ String datasourceId =
+
seatunnelDatasourceControllerWrapper.createFakeSourceDatasource("ds2" +
uniqueId);
+ Result<DatasourceDetailRes> result =
+
seatunnelDatasourceControllerWrapper.getDatasource(datasourceId);
+ assertNotNull(result.getData());
+ DatasourceReq req = new DatasourceReq();
+ req.setDescription("new Description");
+ // Populate req with valid data
+ Result<Boolean> updateResult =
+
seatunnelDatasourceControllerWrapper.updateDatasource(datasourceId, req);
+ assertTrue(updateResult.isSuccess());
+ result =
seatunnelDatasourceControllerWrapper.getDatasource(datasourceId);
+ assertEquals(req.getDescription(), result.getData().getDescription());
+ }
+
+ @Test
+ public void deleteDatasource_shouldReturnSuccess() {
+ String id =
+
seatunnelDatasourceControllerWrapper.createFakeSourceDatasource("ds3" +
uniqueId);
+ Result<Boolean> result =
seatunnelDatasourceControllerWrapper.deleteDatasource(id);
+ assertTrue(result.isSuccess());
+ }
+
+ @Test
+ public void getDatasourceDetail_shouldReturnSuccess() {
+ String id =
seatunnelDatasourceControllerWrapper.createConsoleDatasource("ds4" + uniqueId);
+ Result<DatasourceDetailRes> result =
seatunnelDatasourceControllerWrapper.getDatasource(id);
+ assertTrue(result.isSuccess());
+ }
+
+ @Test
+ public void getDatasourceDetailByName_shouldReturnSuccess() {
+ String datasourceName = "ds5" + uniqueId;
+ String id =
seatunnelDatasourceControllerWrapper.createFakeSourceDatasource(datasourceName);
+ Result<PageInfo<DatasourceRes>> datasourceList =
+ seatunnelDatasourceControllerWrapper.getDatasourceList(
+ datasourceName, "FakeSource", 1, 10);
+ assertTrue(datasourceList.isSuccess());
+ assertNotNull(datasourceList.getData());
+ assertEquals(1, datasourceList.getData().getData().size());
+ assertEquals(id, datasourceList.getData().getData().get(0).getId());
+ }
+
+ @Test
+ public void createDatasource_shouldFailIfDuplicate() {
+ String datasourceName = "ds6" + uniqueId;
+ String datasourceId =
+
seatunnelDatasourceControllerWrapper.createFakeSourceDatasource(datasourceName);
+ assertTrue(!datasourceId.isEmpty());
+
+ DatasourceReq req =
+
seatunnelDatasourceControllerWrapper.getFakeSourceDatasourceReq(datasourceName);
+ Result<String> result =
seatunnelDatasourceControllerWrapper.createDatasource(req);
+ assertTrue(result.isFailed());
+ assertEquals(60004, result.getCode());
+ }
+
+ @AfterAll
+ public static void tearDown() {
+ seaTunnelWebCluster.stop();
+ }
+}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/UserControllerTest.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/UserControllerTest.java
new file mode 100644
index 00000000..f6e72857
--- /dev/null
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/UserControllerTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.test;
+
+import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.common.SeaTunnelWebCluster;
+import org.apache.seatunnel.app.controller.UserControllerWrapper;
+import org.apache.seatunnel.app.domain.request.user.AddUserReq;
+import org.apache.seatunnel.app.domain.request.user.UpdateUserReq;
+import org.apache.seatunnel.app.domain.response.user.AddUserRes;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class UserControllerTest {
+ private static final SeaTunnelWebCluster seaTunnelWebCluster = new
SeaTunnelWebCluster();
+ private static UserControllerWrapper userControllerWrapper;
+ private static String uniqueId = "_" + System.currentTimeMillis();
+
+ @BeforeAll
+ public static void setUp() {
+ seaTunnelWebCluster.start();
+ userControllerWrapper = new UserControllerWrapper();
+ }
+
+ @Test
+ public void addUser_shouldReturnSuccess_whenValidRequest() {
+ String user = "addUser" + uniqueId;
+ AddUserReq addUserReq = getAddUserReq(user, "pass1");
+ Result<AddUserRes> result = userControllerWrapper.addUser(addUserReq);
+ assertTrue(result.isSuccess());
+ assertTrue(result.getData().getId() > 0);
+ }
+
+ private static AddUserReq getAddUserReq(String user, String pass) {
+ AddUserReq addUserReq = new AddUserReq();
+ addUserReq.setUsername(user);
+ addUserReq.setPassword(pass);
+ addUserReq.setStatus((byte) 0);
+ addUserReq.setType((byte) 0);
+ return addUserReq;
+ }
+
+ @Test
+ public void updateUser_shouldReturnSuccess_whenValidRequest() {
+ String user = "updateUser" + uniqueId;
+ AddUserReq addUserReq = getAddUserReq(user, "pass2");
+ Result<AddUserRes> result = userControllerWrapper.addUser(addUserReq);
+ assertTrue(result.isSuccess());
+ UpdateUserReq updateUserReq = new UpdateUserReq();
+ updateUserReq.setUsername(user);
+ updateUserReq.setUserId(result.getData().getId());
+ updateUserReq.setPassword("pass3");
+ updateUserReq.setStatus((byte) 0);
+ updateUserReq.setType((byte) 0);
+ Result<Void> updateUserResult =
+ userControllerWrapper.updateUser(
+ String.valueOf(updateUserReq.getUserId()),
updateUserReq);
+ assertTrue(updateUserResult.isSuccess());
+ }
+
+ @Test
+ public void deleteUser_shouldReturnSuccess_whenValidUserId() {
+ String user = "deleteUser" + uniqueId;
+ AddUserReq addUserReq = getAddUserReq(user, "pass3");
+ Result<AddUserRes> result = userControllerWrapper.addUser(addUserReq);
+ assertTrue(result.isSuccess());
+ Result<Void> deleteUserResult =
+
userControllerWrapper.deleteUser(String.valueOf(result.getData().getId()));
+ assertTrue(deleteUserResult.isSuccess());
+ }
+
+ @Test
+ public void listUsers_shouldReturnUsers_whenUsersExist() {
+ Result<Void> result = userControllerWrapper.listUsers(1, 10);
+ assertTrue(result.isSuccess());
+ assertNotNull(result.getData());
+ }
+
+ @AfterAll
+ public static void tearDown() {
+ Result<Void> logout = userControllerWrapper.logout();
+ assertTrue(logout.isSuccess());
+ seaTunnelWebCluster.stop();
+ }
+}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JSONTestUtils.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JSONTestUtils.java
new file mode 100644
index 00000000..cfd29d02
--- /dev/null
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JSONTestUtils.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.utils;
+
+import org.apache.seatunnel.app.common.Constants;
+import org.apache.seatunnel.app.domain.ConnectorInfoDeserializer;
+import org.apache.seatunnel.app.domain.PluginIdentifierDeserializer;
+import org.apache.seatunnel.app.domain.response.connector.ConnectorInfo;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.databind.type.CollectionType;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+
+import javax.annotation.Nullable;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.TimeZone;
+
+import static
com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
+import static
com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
+import static
com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
+import static
com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
+
+public class JSONTestUtils {
+
+ private static final Logger logger =
LoggerFactory.getLogger(JSONTestUtils.class);
+
+ static {
+ logger.info("init timezone: {}", TimeZone.getDefault());
+ }
+
+ private static final ObjectMapper objectMapper =
+ JsonMapper.builder()
+ .configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
+ .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
+ .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
+ .configure(REQUIRE_SETTERS_FOR_GETTERS, true)
+ .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
+ .addModule(new JavaTimeModule())
+ .defaultTimeZone(TimeZone.getDefault())
+ .defaultDateFormat(new
SimpleDateFormat(Constants.YYYY_MM_DD_HH_MM_SS))
+ .defaultPrettyPrinter(new DefaultPrettyPrinter())
+ .build();
+ /* can use static singleton, inject: just make sure to reuse! */
+ static {
+ SimpleModule module = new SimpleModule();
+ module.addDeserializer(PluginIdentifier.class, new
PluginIdentifierDeserializer());
+ module.addDeserializer(ConnectorInfo.class, new
ConnectorInfoDeserializer());
+ objectMapper.registerModule(module);
+ }
+
+ /**
+ * This method deserializes the specified Json into an object of the
specified class. It is not
+ * suitable to use if the specified class is a generic type since it will
not have the generic
+ * type information because of the Type Erasure feature of Java.
Therefore, this method should
+ * not be used if the desired type is a generic type. Note that this
method works fine if any of
+ * the fields of the specified object are generics, just the object itself
should not be a
+ * generic type.
+ *
+ * @param json the string from which the object is to be deserialized
+ * @param clazz the class of T
+ * @param <T> T
+ * @return an object of type T from the string classOfT
+ */
+ public static @Nullable <T> T parseObject(String json, Class<T> clazz) {
+ if (StringUtils.isEmpty(json)) {
+ return null;
+ }
+
+ try {
+ return objectMapper.readValue(json, clazz);
+ } catch (Exception e) {
+ logger.error("parse object exception! json: {}", json, e);
+ }
+ return null;
+ }
+
+ /**
+ * json to list
+ *
+ * @param json json string
+ * @param clazz class
+ * @param <T> T
+ * @return list
+ */
+ public static <T> List<T> toList(String json, Class<T> clazz) {
+ if (StringUtils.isEmpty(json)) {
+ return Collections.emptyList();
+ }
+ try {
+ CollectionType listType =
+
objectMapper.getTypeFactory().constructCollectionType(ArrayList.class, clazz);
+ return objectMapper.readValue(json, listType);
+ } catch (Exception e) {
+ logger.error("parse list exception! json: {}", json, e);
+ }
+
+ return Collections.emptyList();
+ }
+
+ /**
+ * json to object
+ *
+ * @param json json string
+ * @param type type reference
+ * @param <T>
+ * @return return parse object
+ */
+ public static @Nullable <T> T parseObject(String json, TypeReference<T>
type) {
+ if (StringUtils.isEmpty(json)) {
+ return null;
+ }
+
+ try {
+ return objectMapper.readValue(json, type);
+ } catch (Exception e) {
+ logger.error("json to map exception!, json: {}", json, e);
+ }
+
+ return null;
+ }
+}
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java
new file mode 100644
index 00000000..0a530d3b
--- /dev/null
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.utils;
+
+import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.controller.JobConfigControllerWrapper;
+import org.apache.seatunnel.app.controller.JobDefinitionControllerWrapper;
+import org.apache.seatunnel.app.controller.JobMetricsControllerWrapper;
+import org.apache.seatunnel.app.controller.JobTaskControllerWrapper;
+import
org.apache.seatunnel.app.controller.SeatunnelDatasourceControllerWrapper;
+import org.apache.seatunnel.app.domain.request.job.Edge;
+import org.apache.seatunnel.app.domain.request.job.JobConfig;
+import org.apache.seatunnel.app.domain.request.job.JobDAG;
+import org.apache.seatunnel.app.domain.response.job.JobTaskCheckRes;
+import
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class JobUtils {
+ private static JobMetricsControllerWrapper jobMetricsControllerWrapper =
+ new JobMetricsControllerWrapper();
+ private static JobConfigControllerWrapper jobConfigControllerWrapper =
+ new JobConfigControllerWrapper();
+ private static JobDefinitionControllerWrapper
jobDefinitionControllerWrapper =
+ new JobDefinitionControllerWrapper();
+ private static JobTaskControllerWrapper jobTaskControllerWrapper =
+ new JobTaskControllerWrapper();
+ private static SeatunnelDatasourceControllerWrapper
seatunnelDatasourceControllerWrapper =
+ new SeatunnelDatasourceControllerWrapper();
+ private static final long TIMEOUT = 60; // 1 minute
+ private static final long INTERVAL = 2; // 1 second
+
+ public static Result<List<JobPipelineDetailMetricsRes>>
waitForJobCompletion(
+ long jobInstanceId) {
+ return waitForJobCompletion(jobInstanceId, TIMEOUT, INTERVAL);
+ }
+
+ public static Result<List<JobPipelineDetailMetricsRes>>
waitForJobCompletion(
+ long jobInstanceId, long timeout, long interval) {
+ long startTime = System.currentTimeMillis();
+ while (true) {
+ Result<List<JobPipelineDetailMetricsRes>> detail =
+ jobMetricsControllerWrapper.detail(jobInstanceId);
+ if (!detail.isSuccess()) {
+ throw new RuntimeException("Failed to get job detail metrics");
+ }
+ if (isAllFinished(detail.getData())) {
+ return detail;
+ }
+ if (System.currentTimeMillis() - startTime >
TimeUnit.SECONDS.toMillis(timeout)) {
+ throw new RuntimeException(
+ "Timeout waiting for job to complete. Job not
completed in "
+ + timeout
+ + " seconds.");
+ }
+ try {
+ TimeUnit.SECONDS.sleep(interval);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(
+ "Thread was interrupted while waiting for job
completion", e);
+ }
+ }
+ }
+
+ private static boolean isAllFinished(List<JobPipelineDetailMetricsRes>
details) {
+ for (JobPipelineDetailMetricsRes metrics : details) {
+ if (!isFinished(metrics)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static boolean isFinished(JobPipelineDetailMetricsRes metrics) {
+ if (metrics == null || metrics.getStatus() == null) {
+ return false;
+ }
+ switch (metrics.getStatus()) {
+ case "FINISHED":
+ case "CANCELED":
+ case "FAILED":
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ public static Long createJob(String jobName) {
+ Long jobId =
jobDefinitionControllerWrapper.createJobDefinition(jobName);
+ JobConfig jobConfig =
jobConfigControllerWrapper.populateJobConfigObject(jobName);
+ // jobVersionId is same as jobId
+ long jobVersionId = jobId;
+
+ Result<Void> jobConfigResult =
+ jobConfigControllerWrapper.updateJobConfig(jobVersionId,
jobConfig);
+ assertTrue(jobConfigResult.isSuccess());
+
+ String fakeSourceDatasourceId =
+
seatunnelDatasourceControllerWrapper.createFakeSourceDatasource(
+ "source_" + jobName);
+ String consoleDatasourceId =
+
seatunnelDatasourceControllerWrapper.createConsoleDatasource("console_" +
jobName);
+
+ String sourcePluginId =
+ jobTaskControllerWrapper.createFakeSourcePlugin(
+ fakeSourceDatasourceId, jobVersionId);
+ String transPluginId =
jobTaskControllerWrapper.createReplaceTransformPlugin(jobVersionId);
+ String sinkPluginId =
+
jobTaskControllerWrapper.createConsoleSinkPlugin(consoleDatasourceId,
jobVersionId);
+
+ JobDAG jobDAG = new JobDAG();
+ List<Edge> edges = new ArrayList<>();
+ edges.add(new Edge(sourcePluginId, transPluginId));
+ edges.add(new Edge(transPluginId, sinkPluginId));
+ jobDAG.setEdges(edges);
+
+ Result<JobTaskCheckRes> jobTaskCheckResResult =
+ jobTaskControllerWrapper.saveJobDAG(jobVersionId, jobDAG);
+ assertTrue(jobTaskCheckResResult.isSuccess());
+ return jobVersionId;
+ }
+}
diff --git a/seatunnel-web-it/src/test/resources/application.yml
b/seatunnel-web-it/src/test/resources/application.yml
new file mode 100644
index 00000000..3b49d2a3
--- /dev/null
+++ b/seatunnel-web-it/src/test/resources/application.yml
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+server:
+ port: 8802
+
+spring:
+ application:
+ name: seatunnel
+ jackson:
+ date-format: yyyy-MM-dd HH:mm:ss
+ datasource:
+ driver-class-name: com.mysql.cj.jdbc.Driver
+ url:
jdbc:mysql://localhost:3306/seatunnel?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true
+ username: seatunnel_user
+ password: seaTunnel_1234
+ mvc:
+ pathmatch:
+ matching-strategy: ant_path_matcher
+
+jwt:
+ expireTime: 86400
+ # please add key when deploy
+ secretKey: https://github.com/apache/seatunnel
+ algorithm: HS256
+
+---
+spring:
+ config:
+ activate:
+ on-profile: h2
+ sql:
+ init:
+ schema-locations: classpath*:script/seatunnel_server_h2.sql
+ datasource:
+ driver-class-name: org.h2.Driver
+ url:
jdbc:h2:mem:seatunnel;MODE=MySQL;DB_CLOSE_DELAY=-1;DATABASE_TO_LOWER=true
+ username: sa
+ password: sa
+ h2:
+ console:
+ enabled: true
+ path: /h2
+ settings:
+ trace: false
+ web-allow-others: false
\ No newline at end of file
diff --git a/seatunnel-web-it/src/test/resources/hazelcast-client.yaml
b/seatunnel-web-it/src/test/resources/hazelcast-client.yaml
new file mode 100644
index 00000000..d3b7f3d7
--- /dev/null
+++ b/seatunnel-web-it/src/test/resources/hazelcast-client.yaml
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+hazelcast-client:
+ cluster-name: seatunnel-test
+ properties:
+ hazelcast.logging.type: log4j2
+ connection-strategy:
+ connection-retry:
+ cluster-connect-timeout-millis: 3000
+ network:
+ cluster-members:
+ - localhost:5901
\ No newline at end of file
diff --git a/seatunnel-web-it/src/test/resources/hazelcast.yaml
b/seatunnel-web-it/src/test/resources/hazelcast.yaml
new file mode 100644
index 00000000..7ac730e0
--- /dev/null
+++ b/seatunnel-web-it/src/test/resources/hazelcast.yaml
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+hazelcast:
+ cluster-name: seatunnel-test
+ network:
+ rest-api:
+ enabled: true
+ endpoint-groups:
+ CLUSTER_WRITE:
+ enabled: true
+ DATA:
+ enabled: true
+ join:
+ tcp-ip:
+ enabled: true
+ member-list:
+ - localhost
+ port:
+ auto-increment: true
+ port: 5901
+ properties:
+ hazelcast.invocation.max.retry.count: 20
+ hazelcast.tcp.join.port.try.count: 30
+ hazelcast.logging.type: log4j2
+ hazelcast.operation.generic.thread.count: 50
+ hazelcast.heartbeat.failuredetector.type: phi-accrual
+ hazelcast.heartbeat.interval.seconds: 2
+ hazelcast.max.no.heartbeat.seconds: 180
+ hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10
+ hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200
+ hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 100
+
diff --git a/seatunnel-web-it/src/test/resources/logback-spring.xml
b/seatunnel-web-it/src/test/resources/logback-spring.xml
new file mode 100644
index 00000000..145a239d
--- /dev/null
+++ b/seatunnel-web-it/src/test/resources/logback-spring.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<configuration scan="true" scanPeriod="10 seconds">
+ <springProperty scope="context" name="APP_NAME"
source="spring.application.name"/>
+ <property name="APP_LOG_PATH" value="target/logs" />
+ <property name="HOST_NAME" value="${HOSTNAME:-UNKNOWN}"/>
+ <property name="TRACE"
value="[tr:%X{X-B3-TraceId:-},sp:%X{X-B3-SpanId:-}]"/>
+ <include resource="org/springframework/boot/logging/logback/defaults.xml"
/>
+
+ <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+ <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} ${APP_NAME} ${HOST_NAME} %p
${TRACE} [%thread] [%C{0}.%M\(\):%L] - %m%n</pattern>
+ <charset>UTF-8</charset>
+ </encoder>
+ </appender>
+
+ <appender name="seatunnel-web"
class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <file>${APP_LOG_PATH}/seatunnel-web.log</file>
+ <append>true</append>
+ <rollingPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+
<FileNamePattern>${APP_LOG_PATH}/seatunnel-web.log.%d{yyyy-MM-dd}.%i.log</FileNamePattern>
+ <MaxHistory>30</MaxHistory>
+ <MaxFileSize>100MB</MaxFileSize>
+ </rollingPolicy>
+ <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+ <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} ${APP_NAME} ${HOST_NAME} %p
[%thread] [%C{0}.%M\(\):%L] - %m%n</pattern>
+ <charset>UTF-8</charset>
+ </encoder>
+ </appender>
+
+ <root level="INFO">
+ <appender-ref ref="seatunnel-web" />
+ <appender-ref ref="console" />
+ </root>
+</configuration>
diff --git a/seatunnel-web-it/src/test/resources/seatunnel.yaml
b/seatunnel-web-it/src/test/resources/seatunnel.yaml
new file mode 100644
index 00000000..5961c839
--- /dev/null
+++ b/seatunnel-web-it/src/test/resources/seatunnel.yaml
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+seatunnel:
+ engine:
+ history-job-expire-minutes: 1440
+ backup-count: 1
+ queue-type: blockingqueue
+ print-execution-info-interval: 60
+ print-job-metrics-info-interval: 60
+ slot-service:
+ dynamic-slot: true
+ checkpoint:
+ interval: 10000
+ timeout: 60000
+ storage:
+ type: hdfs
+ max-retained: 3
+ plugin-config:
+ namespace: /tmp/seatunnel/checkpoint_snapshot
+ storage.type: hdfs
+ fs.defaultFS: file:///tmp/ # Ensure that the directory has written
permission
\ No newline at end of file
diff --git a/tools/dependencies/known-dependencies.txt
b/tools/dependencies/known-dependencies.txt
index 0a42b54b..0a8dfa02 100644
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -7,7 +7,12 @@ commons-io-2.11.0.jar
config-1.3.3.jar
db2jcc-db2jcc4.jar
gson-2.8.6.jar
-guava-19.0.jar
+guava-33.2.1-jre.jar
+checker-qual-3.10.0.jar
+error_prone_annotations-2.26.1.jar
+failureaccess-1.0.2.jar
+j2objc-annotations-3.0.0.jar
+listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
hibernate-validator-6.2.2.Final.jar
jackson-annotations-2.12.6.jar
jackson-core-2.13.3.jar