This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f855cddf91 [Feature][RestAPI] Support submit job with seatunnel style
hocon format config (#8000)
f855cddf91 is described below
commit f855cddf91c0b5d8ddd8436c706603b9d013cd92
Author: CosmosNi <[email protected]>
AuthorDate: Sat Nov 23 09:37:42 2024 +0800
[Feature][RestAPI] Support submit job with seatunnel style hocon format
config (#8000)
Co-authored-by: Jia Fan <[email protected]>
---
docs/en/seatunnel-engine/rest-api-v2.md | 40 +-
docs/zh/seatunnel-engine/rest-api-v1.md | 2 +-
docs/zh/seatunnel-engine/rest-api-v2.md | 39 +-
.../connectors/seatunnel/jdbc/JdbcGBase8aIT.java | 2 +
.../seatunnel/engine/e2e/CheckpointEnableIT.java | 12 +-
.../engine/e2e/ClusterSeaTunnelContainer.java | 534 +++++++++++++++++++++
.../seatunnel/engine/server/rest/RestConstant.java | 4 +
.../engine/server/rest/service/JobInfoService.java | 15 +-
.../engine/server/rest/servlet/BaseServlet.java | 14 +
.../server/rest/servlet/SubmitJobServlet.java | 10 +-
10 files changed, 657 insertions(+), 15 deletions(-)
diff --git a/docs/en/seatunnel-engine/rest-api-v2.md
b/docs/en/seatunnel-engine/rest-api-v2.md
index 2c642dd8fb..40d0741360 100644
--- a/docs/en/seatunnel-engine/rest-api-v2.md
+++ b/docs/en/seatunnel-engine/rest-api-v2.md
@@ -384,15 +384,18 @@ When we can't get the job info, the response will be:
#### Parameters
-> | name | type | data type | description
|
+> | name | type | data type | description
|
> |----------------------|----------|-----------|-----------------------------------|
> | jobId | optional | string | job id
> |
> | jobName | optional | string | job name
> |
> | isStartWithSavePoint | optional | string | if job is started with save
> point |
+> | format | optional | string | config format, support json
and hocon, default json |
#### Body
-```json
+You can choose json or hocon to pass request body.
+The json format example:
+``` json
{
"env": {
"job.mode": "batch"
@@ -421,6 +424,37 @@ When we can't get the job info, the response will be:
]
}
```
+The hocon format example:
+``` hocon
+env {
+ job.mode = "batch"
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake"
+ row.num = 100
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ card = "int"
+ }
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ Console {
+ source_table_name = "fake"
+ }
+}
+
+```
+
#### Responses
@@ -810,4 +844,4 @@ Returns a list of logs from the requested node.
To get a list of logs from the current node: `http://localhost:5801/log`
To get the content of a log file:
`http://localhost:5801/log/job-898380162133917698.log`
-</details>
\ No newline at end of file
+</details>
diff --git a/docs/zh/seatunnel-engine/rest-api-v1.md
b/docs/zh/seatunnel-engine/rest-api-v1.md
index 5aa9f111df..217eba6e49 100644
--- a/docs/zh/seatunnel-engine/rest-api-v1.md
+++ b/docs/zh/seatunnel-engine/rest-api-v1.md
@@ -2,7 +2,7 @@
sidebar_position: 11
---
-# RESTful API
+# RESTful API V1
:::caution warn
diff --git a/docs/zh/seatunnel-engine/rest-api-v2.md
b/docs/zh/seatunnel-engine/rest-api-v2.md
index 0ec9741d40..3d822be2f9 100644
--- a/docs/zh/seatunnel-engine/rest-api-v2.md
+++ b/docs/zh/seatunnel-engine/rest-api-v2.md
@@ -2,7 +2,7 @@
sidebar_position: 12
---
-# RESTful API
+# RESTful API V2
SeaTunnel有一个用于监控的API,可用于查询运行作业的状态和统计信息,以及最近完成的作业。监控API是RESTful风格的,它接受HTTP请求并使用JSON数据格式进行响应。
@@ -380,14 +380,17 @@ seatunnel:
#### 参数
-> | 参数名称 | 是否必传 | 参数类型 | 参数描述
|
->
|----------------------|----------|--------|-----------------------------------|
+> | 参数名称 | 是否必传 | 参数类型 | 参数描述
|
+>
|----------------------|----------|-----------------------------------|-----------------------------------|
> | jobId | optional | string | job id
> |
> | jobName | optional | string | job name
> |
> | isStartWithSavePoint | optional | string | if job is started with save
> point |
+> | format | optional | string | 配置风格,支持json和hocon,默认 json
|
#### 请求体
+你可以选择用json或者hocon的方式来传递请求体。
+Json请求示例:
```json
{
"env": {
@@ -418,6 +421,36 @@ seatunnel:
}
```
+Hocon请求示例:
+```hocon
+env {
+ job.mode = "batch"
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake"
+ row.num = 100
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ card = "int"
+ }
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ Console {
+ source_table_name = "fake"
+ }
+}
+
+```
#### 响应
```json
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGBase8aIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGBase8aIT.java
index f0c4e60912..20573cfb11 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGBase8aIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGBase8aIT.java
@@ -23,6 +23,7 @@ import
org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.commons.lang3.tuple.Pair;
+import org.junit.jupiter.api.Disabled;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerLoggerFactory;
@@ -39,6 +40,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+@Disabled("due to the driver cannot be downloaded")
public class JdbcGBase8aIT extends AbstractJdbcIT {
private static final String GBASE_IMAGE = "shihd/gbase8a:1.0";
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
index f415082941..661da1b7cd 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
@@ -200,7 +200,7 @@ public class CheckpointEnableIT extends TestSuiteBase {
@TestTemplate
@DisabledOnContainer(
- value = {TestContainerId.FLINK_1_17, TestContainerId.FLINK_1_18},
+ value = {},
type = {EngineType.SEATUNNEL, EngineType.SPARK},
disabledReason =
"depending on the engine, the logic for determining
whether a checkpoint is enabled is different")
@@ -232,8 +232,14 @@ public class CheckpointEnableIT extends TestSuiteBase {
* disabled. reference {@link
* org.apache.flink.runtime.jobgraph.JobGraph#isCheckpointingEnabled()}
*/
- Assertions.assertEquals(Long.MAX_VALUE,
jobConfig.getOrDefault("interval", 0L));
- Assertions.assertEquals(0, enableExecResult.getExitCode());
+ if (container.identifier().equals(TestContainerId.FLINK_1_13)
+ || container.identifier().equals(TestContainerId.FLINK_1_14)
+ || container.identifier().equals(TestContainerId.FLINK_1_15)
+ || container.identifier().equals(TestContainerId.FLINK_1_16)) {
+ Assertions.assertEquals(Long.MAX_VALUE,
jobConfig.getOrDefault("interval", 0L));
+ } else {
+ Assertions.assertEquals(0, jobConfig.getOrDefault("interval", 0));
+ }
}
@TestTemplate
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java
index 804c77f340..8dd6cd5321 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java
@@ -61,6 +61,8 @@ public class ClusterSeaTunnelContainer extends
SeaTunnelContainer {
private static final String jobName = "test测试";
private static final String paramJobName = "param_test测试";
+ private static final String hoconJobName = "test_hocon测试";
+ private static final String hoconParamJobName = "param_test_hocon测试";
private static final String http = "http://";
@@ -77,6 +79,10 @@ public class ClusterSeaTunnelContainer extends
SeaTunnelContainer {
private static final long CUSTOM_JOB_ID_2 = 862969647010611202L;
+ private static final long HOCON_CUSTOM_JOB_ID_1 = 862969647010611203L;
+
+ private static final long HOCON_CUSTOM_JOB_ID_2 = 862969647010611204L;
+
private static List<Tuple3<Integer, String, Long>> tasks;
@Override
@@ -108,6 +114,14 @@ public class ClusterSeaTunnelContainer extends
SeaTunnelContainer {
new Tuple3<>(
server.getMappedPort(5801), RestConstant.CONTEXT_PATH,
CUSTOM_JOB_ID_1));
tasks.add(new Tuple3<>(server.getMappedPort(8080), "",
CUSTOM_JOB_ID_2));
+
+ tasks.add(
+ new Tuple3<>(
+ server.getMappedPort(5801),
+ RestConstant.CONTEXT_PATH,
+ HOCON_CUSTOM_JOB_ID_1));
+
+ tasks.add(new Tuple3<>(server.getMappedPort(8080), "",
HOCON_CUSTOM_JOB_ID_2));
}
@Override
@@ -706,6 +720,419 @@ public class ClusterSeaTunnelContainer extends
SeaTunnelContainer {
});
}
+ @Test
+ public void testHoconSubmitJobWithCustomJobId() {
+ AtomicInteger i = new AtomicInteger();
+ Arrays.asList(server, secondServer)
+ .forEach(
+ container -> {
+ Tuple3<Integer, String, Long> task = tasks.get(2);
+ submitHoconJobAndAssertResponse(
+ container,
+ task._1(),
+ task._2(),
+ i,
+ hoconParamJobName + "&jobId=" + task._3(),
+ true,
+ task._3().toString());
+ });
+ }
+
+ @Test
+ public void testHoconSubmitJobWithCustomJobIdV2() {
+ AtomicInteger i = new AtomicInteger();
+ Arrays.asList(server, secondServer)
+ .forEach(
+ container -> {
+ Tuple3<Integer, String, Long> task = tasks.get(3);
+ submitHoconJobAndAssertResponse(
+ container,
+ task._1(),
+ task._2(),
+ i,
+ hoconParamJobName + "&jobId=" + task._3(),
+ true,
+ task._3().toString());
+ });
+ }
+
+ @Test
+ public void testHoconSubmitJobWithoutCustomJobId() {
+ AtomicInteger i = new AtomicInteger();
+ Arrays.asList(server, secondServer)
+ .forEach(
+ container -> {
+ Tuple3<Integer, String, Long> task = tasks.get(2);
+ submitHoconJobAndAssertResponse(
+ container,
+ task._1(),
+ task._2(),
+ i,
+ hoconParamJobName,
+ false,
+ task._3().toString());
+ });
+ }
+
+ @Test
+ public void testHoconSubmitJobWithoutCustomJobIdV2() {
+ AtomicInteger i = new AtomicInteger();
+ Arrays.asList(server, secondServer)
+ .forEach(
+ container -> {
+ Tuple3<Integer, String, Long> task = tasks.get(3);
+ submitHoconJobAndAssertResponse(
+ container,
+ task._1(),
+ task._2(),
+ i,
+ hoconParamJobName,
+ false,
+ task._3().toString());
+ });
+ }
+
+ @Test
+ public void testHoconStartWithSavePointWithoutJobId() {
+ Arrays.asList(server, secondServer)
+ .forEach(
+ container -> {
+ Tuple3<Integer, String, Long> task = tasks.get(2);
+ Response response =
+ submitHoconJob(
+ "BATCH",
+ container,
+ task._1(),
+ task._2(),
+ true,
+ hoconJobName,
+ hoconParamJobName);
+ response.then()
+ .statusCode(400)
+ .body(
+ "message",
+ equalTo(
+ "Please provide jobId when
start with save point."));
+ });
+ }
+
+ @Test
+ public void testHoconStartWithSavePointWithoutJobIdV2() {
+ Arrays.asList(server, secondServer)
+ .forEach(
+ container -> {
+ Tuple3<Integer, String, Long> task = tasks.get(3);
+ Response response =
+ submitHoconJob(
+ "BATCH",
+ container,
+ task._1(),
+ task._2(),
+ true,
+ hoconJobName,
+ hoconParamJobName);
+ response.then()
+ .statusCode(400)
+ .body(
+ "message",
+ equalTo(
+ "Please provide jobId when
start with save point."));
+ });
+ }
+
+ @Test
+ public void testHoconStopJob() {
+ AtomicInteger i = new AtomicInteger();
+
+ Arrays.asList(server, secondServer)
+ .forEach(
+ container -> {
+ Tuple3<Integer, String, Long> task = tasks.get(2);
+ String jobId =
+ submitHoconJob(
+ container,
+ task._1(),
+ task._2(),
+ "STREAMING",
+ hoconJobName,
+ hoconParamJobName)
+ .getBody()
+ .jsonPath()
+ .getString("jobId");
+
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ given().get(
+ http
+ +
container.getHost()
+ +
colon
+ +
task._1()
+ +
task._2()
+ +
RestConstant
+
.RUNNING_JOB_URL
+ +
"/"
+ +
jobId)
+ .then()
+ .statusCode(200)
+ .body("jobStatus",
equalTo("RUNNING")));
+
+ String parameters =
+ "{"
+ + "\"jobId\":"
+ + jobId
+ + ","
+ + "\"isStopWithSavePoint\":true}";
+
+ given().body(parameters)
+ .post(
+ http
+ + container.getHost()
+ + colon
+ + task._1()
+ + task._2()
+ +
RestConstant.STOP_JOB_URL)
+ .then()
+ .statusCode(200)
+ .body("jobId", equalTo(jobId));
+
+ Awaitility.await()
+ .atMost(6, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ given().get(
+ http
+ +
container.getHost()
+ +
colon
+ +
task._1()
+ +
task._2()
+ +
RestConstant
+
.FINISHED_JOBS_INFO
+ +
"/SAVEPOINT_DONE")
+ .then()
+ .statusCode(200)
+ .body(
+ "[" +
i.get() + "].jobId",
+
equalTo(jobId)));
+
+ String jobId2 =
+ submitHoconJob(
+ container,
+ task._1(),
+ task._2(),
+ "STREAMING",
+ hoconJobName,
+ hoconParamJobName)
+ .getBody()
+ .jsonPath()
+ .getString("jobId");
+
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ given().get(
+ http
+ +
container.getHost()
+ +
colon
+ +
task._1()
+ +
task._2()
+ +
RestConstant
+
.RUNNING_JOB_URL
+ +
"/"
+ +
jobId2)
+ .then()
+ .statusCode(200)
+ .body("jobStatus",
equalTo("RUNNING")));
+ parameters =
+ "{"
+ + "\"jobId\":"
+ + jobId2
+ + ","
+ + "\"isStopWithSavePoint\":false}";
+
+ given().body(parameters)
+ .post(
+ http
+ + container.getHost()
+ + colon
+ + task._1()
+ + task._2()
+ +
RestConstant.STOP_JOB_URL)
+ .then()
+ .statusCode(200)
+ .body("jobId", equalTo(jobId2));
+
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ given().get(
+ http
+ +
container.getHost()
+ +
colon
+ +
task._1()
+ +
task._2()
+ +
RestConstant
+
.FINISHED_JOBS_INFO
+ +
"/CANCELED")
+ .then()
+ .statusCode(200)
+ .body(
+ "[" +
i.get() + "].jobId",
+
equalTo(jobId2)));
+ i.getAndIncrement();
+ });
+ }
+
+ @Test
+ public void testHoconStopJobV2() {
+ AtomicInteger i = new AtomicInteger();
+
+ Arrays.asList(server, secondServer)
+ .forEach(
+ container -> {
+ Tuple3<Integer, String, Long> task = tasks.get(3);
+ String jobId =
+ submitHoconJob(
+ container,
+ task._1(),
+ task._2(),
+ "STREAMING",
+ hoconJobName,
+ hoconParamJobName)
+ .getBody()
+ .jsonPath()
+ .getString("jobId");
+
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ given().get(
+ http
+ +
container.getHost()
+ +
colon
+ +
task._1()
+ +
task._2()
+ +
RestConstant
+
.RUNNING_JOB_URL
+ +
"/"
+ +
jobId)
+ .then()
+ .statusCode(200)
+ .body("jobStatus",
equalTo("RUNNING")));
+
+ String parameters =
+ "{"
+ + "\"jobId\":"
+ + jobId
+ + ","
+ + "\"isStopWithSavePoint\":true}";
+
+ given().body(parameters)
+ .post(
+ http
+ + container.getHost()
+ + colon
+ + task._1()
+ + task._2()
+ +
RestConstant.STOP_JOB_URL)
+ .then()
+ .statusCode(200)
+ .body("jobId", equalTo(jobId));
+
+ Awaitility.await()
+ .atMost(6, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ given().get(
+ http
+ +
container.getHost()
+ +
colon
+ +
task._1()
+ +
task._2()
+ +
RestConstant
+
.FINISHED_JOBS_INFO
+ +
"/SAVEPOINT_DONE")
+ .then()
+ .statusCode(200)
+ .body(
+ "[" +
i.get() + "].jobId",
+
equalTo(jobId)));
+
+ String jobId2 =
+ submitHoconJob(
+ container,
+ task._1(),
+ task._2(),
+ "STREAMING",
+ hoconJobName,
+ hoconParamJobName)
+ .getBody()
+ .jsonPath()
+ .getString("jobId");
+
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ given().get(
+ http
+ +
container.getHost()
+ +
colon
+ +
task._1()
+ +
task._2()
+ +
RestConstant
+
.RUNNING_JOB_URL
+ +
"/"
+ +
jobId2)
+ .then()
+ .statusCode(200)
+ .body("jobStatus",
equalTo("RUNNING")));
+ parameters =
+ "{"
+ + "\"jobId\":"
+ + jobId2
+ + ","
+ + "\"isStopWithSavePoint\":false}";
+
+ given().body(parameters)
+ .post(
+ http
+ + container.getHost()
+ + colon
+ + task._1()
+ + task._2()
+ +
RestConstant.STOP_JOB_URL)
+ .then()
+ .statusCode(200)
+ .body("jobId", equalTo(jobId2));
+
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ given().get(
+ http
+ +
container.getHost()
+ +
colon
+ +
task._1()
+ +
task._2()
+ +
RestConstant
+
.FINISHED_JOBS_INFO
+ +
"/CANCELED")
+ .then()
+ .statusCode(200)
+ .body(
+ "[" +
i.get() + "].jobId",
+
equalTo(jobId2)));
+
+ i.getAndIncrement();
+ });
+ }
+
private void submitJobs(
String jobMode,
GenericContainer<?> container,
@@ -1015,4 +1442,111 @@ public class ClusterSeaTunnelContainer extends
SeaTunnelContainer {
+ contextPath
+ RestConstant.FINISHED_JOBS_INFO;
}
+
+ private Response submitHoconJob(
+ GenericContainer<?> container,
+ int port,
+ String contextPath,
+ String jobMode,
+ String jobName,
+ String paramJobName) {
+ return submitHoconJob(jobMode, container, port, contextPath, false,
jobName, paramJobName);
+ }
+
+ private Response submitHoconJob(
+ String jobMode,
+ GenericContainer<?> container,
+ int port,
+ String contextPath,
+ boolean isStartWithSavePoint,
+ String jobName,
+ String paramJobName) {
+ String requestBody =
+ String.format(
+ "env {\n"
+ + " job.name = \"%s\"\n"
+ + " job.mode = \"%s\"\n"
+ + "}\n\n"
+ + "source {\n"
+ + " FakeSource {\n"
+ + " result_table_name = \"fake\"\n"
+ + " schema = {\n"
+ + " fields {\n"
+ + " name = \"string\"\n"
+ + " age = \"int\"\n"
+ + " card = \"int\"\n"
+ + " }\n"
+ + " }\n"
+ + " }\n"
+ + "}\n\n"
+ + "transform {\n"
+ + "}\n\n"
+ + "sink {\n"
+ + " Console {\n"
+ + " source_table_name = \"fake\"\n"
+ + " }\n"
+ + "}\n",
+ jobName, jobMode);
+ String parameters = null;
+ if (paramJobName != null) {
+ parameters = "jobName=" + paramJobName;
+ }
+ if (isStartWithSavePoint) {
+ parameters = parameters + "&isStartWithSavePoint=true";
+ }
+ parameters = parameters + "&format=hocon";
+ Response response =
+ given().body(requestBody)
+ .header("Content-Type", "text/plain; charset=utf-8")
+ .post(
+ parameters == null
+ ? http
+ + container.getHost()
+ + colon
+ + port
+ + contextPath
+ + RestConstant.SUBMIT_JOB_URL
+ : http
+ + container.getHost()
+ + colon
+ + port
+ + contextPath
+ + RestConstant.SUBMIT_JOB_URL
+ + "?"
+ + parameters);
+ return response;
+ }
+
+ private void submitHoconJobAndAssertResponse(
+ GenericContainer<? extends GenericContainer<?>> container,
+ int port,
+ String contextPath,
+ AtomicInteger i,
+ String customParam,
+ boolean isCustomJobId,
+ String customJobId) {
+ Response response = submitHoconJobAndResponse(container, port,
contextPath, i, customParam);
+ String jobId = response.getBody().jsonPath().getString("jobId");
+ assertResponse(container, port, contextPath, i, jobId, customJobId,
isCustomJobId);
+ i.getAndIncrement();
+ }
+
+ private Response submitHoconJobAndResponse(
+ GenericContainer<? extends GenericContainer<?>> container,
+ int port,
+ String contextPath,
+ AtomicInteger i,
+ String customParam) {
+ Response response =
+ i.get() == 0
+ ? submitHoconJob(
+ container, port, contextPath, "BATCH",
hoconJobName, customParam)
+ : submitHoconJob(container, port, contextPath,
"BATCH", hoconJobName, null);
+ if (i.get() == 0) {
+ response.then().statusCode(200).body("jobName",
equalTo(hoconParamJobName));
+ } else {
+ response.then().statusCode(200).body("jobName",
equalTo(hoconJobName));
+ }
+ return response;
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
index e8c39da7f2..8a4b6e0261 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
@@ -27,6 +27,8 @@ public class RestConstant {
public static final String IS_STOP_WITH_SAVE_POINT = "isStopWithSavePoint";
+ public static final String CONFIG_FORMAT = "format";
+
public static final String JOB_STATUS = "jobStatus";
public static final String CREATE_TIME = "createTime";
@@ -45,6 +47,8 @@ public class RestConstant {
public static final String METRICS = "metrics";
+ public static final String HOCON = "hocon";
+
public static final String TABLE_SOURCE_RECEIVED_COUNT =
"TableSourceReceivedCount";
public static final String TABLE_SINK_WRITE_COUNT = "TableSinkWriteCount";
public static final String TABLE_SOURCE_RECEIVED_QPS =
"TableSourceReceivedQPS";
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
index 2c497510ca..7d21c2023c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.rest.service;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.api.common.metrics.JobMetrics;
import org.apache.seatunnel.common.utils.JsonUtils;
@@ -37,11 +38,15 @@ import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngineImpl;
import scala.Tuple2;
+import java.nio.charset.StandardCharsets;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static
org.apache.seatunnel.engine.server.rest.RestConstant.CONFIG_FORMAT;
+import static org.apache.seatunnel.engine.server.rest.RestConstant.HOCON;
+
public class JobInfoService extends BaseService {
public JobInfoService(NodeEngineImpl nodeEngine) {
@@ -157,9 +162,13 @@ public class JobInfoService extends BaseService {
&& requestParams.get(RestConstant.JOB_ID) == null) {
throw new IllegalArgumentException("Please provide jobId when
start with save point.");
}
-
- Config config = RestUtil.buildConfig(requestHandle(requestBody),
false);
-
+ Config config;
+ if (HOCON.equalsIgnoreCase(requestParams.get(CONFIG_FORMAT))) {
+ String requestBodyStr = new String(requestBody,
StandardCharsets.UTF_8);
+ config = ConfigFactory.parseString(requestBodyStr);
+ } else {
+ config = RestUtil.buildConfig(requestHandle(requestBody), false);
+ }
SeaTunnelServer seaTunnelServer = getSeaTunnelServer(false);
return submitJobInternal(config, requestParams, seaTunnelServer,
nodeEngine.getNode());
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
index b37883f7c3..ba9727dab7 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
@@ -116,6 +116,20 @@ public class BaseServlet extends HttpServlet {
return requestBody.getBytes(StandardCharsets.UTF_8);
}
+ public byte[] requestHoconBody(HttpServletRequest req) throws IOException {
+ StringBuilder stringBuilder = new StringBuilder();
+ String line;
+
+ try (BufferedReader reader = req.getReader()) {
+ while ((line = reader.readLine()) != null) {
+ stringBuilder.append(line).append("\n");
+ }
+ }
+
+ String requestBody = stringBuilder.toString();
+ return requestBody.getBytes(StandardCharsets.UTF_8);
+ }
+
protected Map<String, String> getParameterMap(HttpServletRequest req) {
Map<String, String> reqParameterMap = new HashMap<>();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobServlet.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobServlet.java
index b4e61c959c..31b9d60721 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobServlet.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobServlet.java
@@ -27,6 +27,9 @@ import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.Map;
+import static
org.apache.seatunnel.engine.server.rest.RestConstant.CONFIG_FORMAT;
+import static org.apache.seatunnel.engine.server.rest.RestConstant.HOCON;
+
public class SubmitJobServlet extends BaseServlet {
private final JobInfoService jobInfoService;
@@ -39,7 +42,10 @@ public class SubmitJobServlet extends BaseServlet {
public void doPost(HttpServletRequest req, HttpServletResponse resp)
throws IOException {
Map<String, String> requestParams = getParameterMap(req);
-
- writeJson(resp, jobInfoService.submitJob(requestParams,
requestBody(req)));
+ if (HOCON.equalsIgnoreCase(requestParams.get(CONFIG_FORMAT))) {
+ writeJson(resp, jobInfoService.submitJob(requestParams,
requestHoconBody(req)));
+ } else {
+ writeJson(resp, jobInfoService.submitJob(requestParams,
requestBody(req)));
+ }
}
}