This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f855cddf91 [Feature][RestAPI] Support submit job with seatunnel style 
hocon format config (#8000)
f855cddf91 is described below

commit f855cddf91c0b5d8ddd8436c706603b9d013cd92
Author: CosmosNi <[email protected]>
AuthorDate: Sat Nov 23 09:37:42 2024 +0800

    [Feature][RestAPI] Support submit job with seatunnel style hocon format 
config (#8000)
    
    Co-authored-by: Jia Fan <[email protected]>
---
 docs/en/seatunnel-engine/rest-api-v2.md            |  40 +-
 docs/zh/seatunnel-engine/rest-api-v1.md            |   2 +-
 docs/zh/seatunnel-engine/rest-api-v2.md            |  39 +-
 .../connectors/seatunnel/jdbc/JdbcGBase8aIT.java   |   2 +
 .../seatunnel/engine/e2e/CheckpointEnableIT.java   |  12 +-
 .../engine/e2e/ClusterSeaTunnelContainer.java      | 534 +++++++++++++++++++++
 .../seatunnel/engine/server/rest/RestConstant.java |   4 +
 .../engine/server/rest/service/JobInfoService.java |  15 +-
 .../engine/server/rest/servlet/BaseServlet.java    |  14 +
 .../server/rest/servlet/SubmitJobServlet.java      |  10 +-
 10 files changed, 657 insertions(+), 15 deletions(-)

diff --git a/docs/en/seatunnel-engine/rest-api-v2.md 
b/docs/en/seatunnel-engine/rest-api-v2.md
index 2c642dd8fb..40d0741360 100644
--- a/docs/en/seatunnel-engine/rest-api-v2.md
+++ b/docs/en/seatunnel-engine/rest-api-v2.md
@@ -384,15 +384,18 @@ When we can't get the job info, the response will be:
 
 #### Parameters
 
-> |         name         |   type   | data type |            description       
     |
+> | name                 |   type   | data type |            description       
     |
 > |----------------------|----------|-----------|-----------------------------------|
 > | jobId                | optional | string    | job id                       
 >      |
 > | jobName              | optional | string    | job name                     
 >      |
 > | isStartWithSavePoint | optional | string    | if job is started with save 
 > point |
+> | format               | optional | string    | config format, support json 
and hocon, default json |
 
 #### Body
 
-```json
+You can choose json or hocon to pass request body.
+The json format example:
+``` json
 {
     "env": {
         "job.mode": "batch"
@@ -421,6 +424,37 @@ When we can't get the job info, the response will be:
     ]
 }
 ```
+The hocon format example:
+``` hocon
+env {
+  job.mode = "batch"
+}
+
+source {
+  FakeSource {
+    result_table_name = "fake"
+    row.num = 100
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+        card = "int"
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  Console {
+    source_table_name = "fake"
+  }
+}
+
+```
+
 
 #### Responses
 
@@ -810,4 +844,4 @@ Returns a list of logs from the requested node.
 To get a list of logs from the current node: `http://localhost:5801/log`
 To get the content of a log file: 
`http://localhost:5801/log/job-898380162133917698.log`
 
-</details>
\ No newline at end of file
+</details>
diff --git a/docs/zh/seatunnel-engine/rest-api-v1.md 
b/docs/zh/seatunnel-engine/rest-api-v1.md
index 5aa9f111df..217eba6e49 100644
--- a/docs/zh/seatunnel-engine/rest-api-v1.md
+++ b/docs/zh/seatunnel-engine/rest-api-v1.md
@@ -2,7 +2,7 @@
 sidebar_position: 11
 ---
 
-# RESTful API
+# RESTful API V1
 
 :::caution warn
 
diff --git a/docs/zh/seatunnel-engine/rest-api-v2.md 
b/docs/zh/seatunnel-engine/rest-api-v2.md
index 0ec9741d40..3d822be2f9 100644
--- a/docs/zh/seatunnel-engine/rest-api-v2.md
+++ b/docs/zh/seatunnel-engine/rest-api-v2.md
@@ -2,7 +2,7 @@
 sidebar_position: 12
 ---
 
-# RESTful API
+# RESTful API V2
 
 
SeaTunnel有一个用于监控的API,可用于查询运行作业的状态和统计信息,以及最近完成的作业。监控API是RESTful风格的,它接受HTTP请求并使用JSON数据格式进行响应。
 
@@ -380,14 +380,17 @@ seatunnel:
 
 #### 参数
 
-> |         参数名称         |   是否必传   |  参数类型  |               参数描述              
  |
-> 
|----------------------|----------|--------|-----------------------------------|
+> |         参数名称         |   是否必传   |  参数类型  | 参数描述                            
  |
+> 
|----------------------|----------|-----------------------------------|-----------------------------------|
 > | jobId                | optional | string | job id                          
 >   |
 > | jobName              | optional | string | job name                        
 >   |
 > | isStartWithSavePoint | optional | string | if job is started with save 
 > point |
+> | format               | optional | string    | 配置风格,支持json和hocon,默认 json    
     |
 
 #### 请求体
 
+你可以选择用json或者hocon的方式来传递请求体。
+Json请求示例:
 ```json
 {
     "env": {
@@ -418,6 +421,36 @@ seatunnel:
 }
 ```
 
+Hocon请求示例:
+```hocon
+env {
+  job.mode = "batch"
+}
+
+source {
+  FakeSource {
+    result_table_name = "fake"
+    row.num = 100
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+        card = "int"
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  Console {
+    source_table_name = "fake"
+  }
+}
+
+```
 #### 响应
 
 ```json
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGBase8aIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGBase8aIT.java
index f0c4e60912..20573cfb11 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGBase8aIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGBase8aIT.java
@@ -23,6 +23,7 @@ import 
org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 
 import org.apache.commons.lang3.tuple.Pair;
 
+import org.junit.jupiter.api.Disabled;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.utility.DockerLoggerFactory;
@@ -39,6 +40,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+@Disabled("due to the driver cannot be downloaded")
 public class JdbcGBase8aIT extends AbstractJdbcIT {
 
     private static final String GBASE_IMAGE = "shihd/gbase8a:1.0";
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
index f415082941..661da1b7cd 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
@@ -200,7 +200,7 @@ public class CheckpointEnableIT extends TestSuiteBase {
 
     @TestTemplate
     @DisabledOnContainer(
-            value = {TestContainerId.FLINK_1_17, TestContainerId.FLINK_1_18},
+            value = {},
             type = {EngineType.SEATUNNEL, EngineType.SPARK},
             disabledReason =
                     "depending on the engine, the logic for determining 
whether a checkpoint is enabled is different")
@@ -232,8 +232,14 @@ public class CheckpointEnableIT extends TestSuiteBase {
          * disabled. reference {@link
          * org.apache.flink.runtime.jobgraph.JobGraph#isCheckpointingEnabled()}
          */
-        Assertions.assertEquals(Long.MAX_VALUE, 
jobConfig.getOrDefault("interval", 0L));
-        Assertions.assertEquals(0, enableExecResult.getExitCode());
+        if (container.identifier().equals(TestContainerId.FLINK_1_13)
+                || container.identifier().equals(TestContainerId.FLINK_1_14)
+                || container.identifier().equals(TestContainerId.FLINK_1_15)
+                || container.identifier().equals(TestContainerId.FLINK_1_16)) {
+            Assertions.assertEquals(Long.MAX_VALUE, 
jobConfig.getOrDefault("interval", 0L));
+        } else {
+            Assertions.assertEquals(0, jobConfig.getOrDefault("interval", 0));
+        }
     }
 
     @TestTemplate
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java
index 804c77f340..8dd6cd5321 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java
@@ -61,6 +61,8 @@ public class ClusterSeaTunnelContainer extends 
SeaTunnelContainer {
 
     private static final String jobName = "test测试";
     private static final String paramJobName = "param_test测试";
+    private static final String hoconJobName = "test_hocon测试";
+    private static final String hoconParamJobName = "param_test_hocon测试";
 
     private static final String http = "http://";;
 
@@ -77,6 +79,10 @@ public class ClusterSeaTunnelContainer extends 
SeaTunnelContainer {
 
     private static final long CUSTOM_JOB_ID_2 = 862969647010611202L;
 
+    private static final long HOCON_CUSTOM_JOB_ID_1 = 862969647010611203L;
+
+    private static final long HOCON_CUSTOM_JOB_ID_2 = 862969647010611204L;
+
     private static List<Tuple3<Integer, String, Long>> tasks;
 
     @Override
@@ -108,6 +114,14 @@ public class ClusterSeaTunnelContainer extends 
SeaTunnelContainer {
                 new Tuple3<>(
                         server.getMappedPort(5801), RestConstant.CONTEXT_PATH, 
CUSTOM_JOB_ID_1));
         tasks.add(new Tuple3<>(server.getMappedPort(8080), "", 
CUSTOM_JOB_ID_2));
+
+        tasks.add(
+                new Tuple3<>(
+                        server.getMappedPort(5801),
+                        RestConstant.CONTEXT_PATH,
+                        HOCON_CUSTOM_JOB_ID_1));
+
+        tasks.add(new Tuple3<>(server.getMappedPort(8080), "", 
HOCON_CUSTOM_JOB_ID_2));
     }
 
     @Override
@@ -706,6 +720,419 @@ public class ClusterSeaTunnelContainer extends 
SeaTunnelContainer {
                         });
     }
 
+    @Test
+    public void testHoconSubmitJobWithCustomJobId() {
+        AtomicInteger i = new AtomicInteger();
+        Arrays.asList(server, secondServer)
+                .forEach(
+                        container -> {
+                            Tuple3<Integer, String, Long> task = tasks.get(2);
+                            submitHoconJobAndAssertResponse(
+                                    container,
+                                    task._1(),
+                                    task._2(),
+                                    i,
+                                    hoconParamJobName + "&jobId=" + task._3(),
+                                    true,
+                                    task._3().toString());
+                        });
+    }
+
+    @Test
+    public void testHoconSubmitJobWithCustomJobIdV2() {
+        AtomicInteger i = new AtomicInteger();
+        Arrays.asList(server, secondServer)
+                .forEach(
+                        container -> {
+                            Tuple3<Integer, String, Long> task = tasks.get(3);
+                            submitHoconJobAndAssertResponse(
+                                    container,
+                                    task._1(),
+                                    task._2(),
+                                    i,
+                                    hoconParamJobName + "&jobId=" + task._3(),
+                                    true,
+                                    task._3().toString());
+                        });
+    }
+
+    @Test
+    public void testHoconSubmitJobWithoutCustomJobId() {
+        AtomicInteger i = new AtomicInteger();
+        Arrays.asList(server, secondServer)
+                .forEach(
+                        container -> {
+                            Tuple3<Integer, String, Long> task = tasks.get(2);
+                            submitHoconJobAndAssertResponse(
+                                    container,
+                                    task._1(),
+                                    task._2(),
+                                    i,
+                                    hoconParamJobName,
+                                    false,
+                                    task._3().toString());
+                        });
+    }
+
+    @Test
+    public void testHoconSubmitJobWithoutCustomJobIdV2() {
+        AtomicInteger i = new AtomicInteger();
+        Arrays.asList(server, secondServer)
+                .forEach(
+                        container -> {
+                            Tuple3<Integer, String, Long> task = tasks.get(3);
+                            submitHoconJobAndAssertResponse(
+                                    container,
+                                    task._1(),
+                                    task._2(),
+                                    i,
+                                    hoconParamJobName,
+                                    false,
+                                    task._3().toString());
+                        });
+    }
+
+    @Test
+    public void testHoconStartWithSavePointWithoutJobId() {
+        Arrays.asList(server, secondServer)
+                .forEach(
+                        container -> {
+                            Tuple3<Integer, String, Long> task = tasks.get(2);
+                            Response response =
+                                    submitHoconJob(
+                                            "BATCH",
+                                            container,
+                                            task._1(),
+                                            task._2(),
+                                            true,
+                                            hoconJobName,
+                                            hoconParamJobName);
+                            response.then()
+                                    .statusCode(400)
+                                    .body(
+                                            "message",
+                                            equalTo(
+                                                    "Please provide jobId when 
start with save point."));
+                        });
+    }
+
+    @Test
+    public void testHoconStartWithSavePointWithoutJobIdV2() {
+        Arrays.asList(server, secondServer)
+                .forEach(
+                        container -> {
+                            Tuple3<Integer, String, Long> task = tasks.get(3);
+                            Response response =
+                                    submitHoconJob(
+                                            "BATCH",
+                                            container,
+                                            task._1(),
+                                            task._2(),
+                                            true,
+                                            hoconJobName,
+                                            hoconParamJobName);
+                            response.then()
+                                    .statusCode(400)
+                                    .body(
+                                            "message",
+                                            equalTo(
+                                                    "Please provide jobId when 
start with save point."));
+                        });
+    }
+
+    @Test
+    public void testHoconStopJob() {
+        AtomicInteger i = new AtomicInteger();
+
+        Arrays.asList(server, secondServer)
+                .forEach(
+                        container -> {
+                            Tuple3<Integer, String, Long> task = tasks.get(2);
+                            String jobId =
+                                    submitHoconJob(
+                                                    container,
+                                                    task._1(),
+                                                    task._2(),
+                                                    "STREAMING",
+                                                    hoconJobName,
+                                                    hoconParamJobName)
+                                            .getBody()
+                                            .jsonPath()
+                                            .getString("jobId");
+
+                            Awaitility.await()
+                                    .atMost(2, TimeUnit.MINUTES)
+                                    .untilAsserted(
+                                            () ->
+                                                    given().get(
+                                                                    http
+                                                                            + 
container.getHost()
+                                                                            + 
colon
+                                                                            + 
task._1()
+                                                                            + 
task._2()
+                                                                            + 
RestConstant
+                                                                               
     .RUNNING_JOB_URL
+                                                                            + 
"/"
+                                                                            + 
jobId)
+                                                            .then()
+                                                            .statusCode(200)
+                                                            .body("jobStatus", 
equalTo("RUNNING")));
+
+                            String parameters =
+                                    "{"
+                                            + "\"jobId\":"
+                                            + jobId
+                                            + ","
+                                            + "\"isStopWithSavePoint\":true}";
+
+                            given().body(parameters)
+                                    .post(
+                                            http
+                                                    + container.getHost()
+                                                    + colon
+                                                    + task._1()
+                                                    + task._2()
+                                                    + 
RestConstant.STOP_JOB_URL)
+                                    .then()
+                                    .statusCode(200)
+                                    .body("jobId", equalTo(jobId));
+
+                            Awaitility.await()
+                                    .atMost(6, TimeUnit.MINUTES)
+                                    .untilAsserted(
+                                            () ->
+                                                    given().get(
+                                                                    http
+                                                                            + 
container.getHost()
+                                                                            + 
colon
+                                                                            + 
task._1()
+                                                                            + 
task._2()
+                                                                            + 
RestConstant
+                                                                               
     .FINISHED_JOBS_INFO
+                                                                            + 
"/SAVEPOINT_DONE")
+                                                            .then()
+                                                            .statusCode(200)
+                                                            .body(
+                                                                    "[" + 
i.get() + "].jobId",
+                                                                    
equalTo(jobId)));
+
+                            String jobId2 =
+                                    submitHoconJob(
+                                                    container,
+                                                    task._1(),
+                                                    task._2(),
+                                                    "STREAMING",
+                                                    hoconJobName,
+                                                    hoconParamJobName)
+                                            .getBody()
+                                            .jsonPath()
+                                            .getString("jobId");
+
+                            Awaitility.await()
+                                    .atMost(2, TimeUnit.MINUTES)
+                                    .untilAsserted(
+                                            () ->
+                                                    given().get(
+                                                                    http
+                                                                            + 
container.getHost()
+                                                                            + 
colon
+                                                                            + 
task._1()
+                                                                            + 
task._2()
+                                                                            + 
RestConstant
+                                                                               
     .RUNNING_JOB_URL
+                                                                            + 
"/"
+                                                                            + 
jobId2)
+                                                            .then()
+                                                            .statusCode(200)
+                                                            .body("jobStatus", 
equalTo("RUNNING")));
+                            parameters =
+                                    "{"
+                                            + "\"jobId\":"
+                                            + jobId2
+                                            + ","
+                                            + "\"isStopWithSavePoint\":false}";
+
+                            given().body(parameters)
+                                    .post(
+                                            http
+                                                    + container.getHost()
+                                                    + colon
+                                                    + task._1()
+                                                    + task._2()
+                                                    + 
RestConstant.STOP_JOB_URL)
+                                    .then()
+                                    .statusCode(200)
+                                    .body("jobId", equalTo(jobId2));
+
+                            Awaitility.await()
+                                    .atMost(2, TimeUnit.MINUTES)
+                                    .untilAsserted(
+                                            () ->
+                                                    given().get(
+                                                                    http
+                                                                            + 
container.getHost()
+                                                                            + 
colon
+                                                                            + 
task._1()
+                                                                            + 
task._2()
+                                                                            + 
RestConstant
+                                                                               
     .FINISHED_JOBS_INFO
+                                                                            + 
"/CANCELED")
+                                                            .then()
+                                                            .statusCode(200)
+                                                            .body(
+                                                                    "[" + 
i.get() + "].jobId",
+                                                                    
equalTo(jobId2)));
+                            i.getAndIncrement();
+                        });
+    }
+
+    @Test
+    public void testHoconStopJobV2() {
+        AtomicInteger i = new AtomicInteger();
+
+        Arrays.asList(server, secondServer)
+                .forEach(
+                        container -> {
+                            Tuple3<Integer, String, Long> task = tasks.get(3);
+                            String jobId =
+                                    submitHoconJob(
+                                                    container,
+                                                    task._1(),
+                                                    task._2(),
+                                                    "STREAMING",
+                                                    hoconJobName,
+                                                    hoconParamJobName)
+                                            .getBody()
+                                            .jsonPath()
+                                            .getString("jobId");
+
+                            Awaitility.await()
+                                    .atMost(2, TimeUnit.MINUTES)
+                                    .untilAsserted(
+                                            () ->
+                                                    given().get(
+                                                                    http
+                                                                            + 
container.getHost()
+                                                                            + 
colon
+                                                                            + 
task._1()
+                                                                            + 
task._2()
+                                                                            + 
RestConstant
+                                                                               
     .RUNNING_JOB_URL
+                                                                            + 
"/"
+                                                                            + 
jobId)
+                                                            .then()
+                                                            .statusCode(200)
+                                                            .body("jobStatus", 
equalTo("RUNNING")));
+
+                            String parameters =
+                                    "{"
+                                            + "\"jobId\":"
+                                            + jobId
+                                            + ","
+                                            + "\"isStopWithSavePoint\":true}";
+
+                            given().body(parameters)
+                                    .post(
+                                            http
+                                                    + container.getHost()
+                                                    + colon
+                                                    + task._1()
+                                                    + task._2()
+                                                    + 
RestConstant.STOP_JOB_URL)
+                                    .then()
+                                    .statusCode(200)
+                                    .body("jobId", equalTo(jobId));
+
+                            Awaitility.await()
+                                    .atMost(6, TimeUnit.MINUTES)
+                                    .untilAsserted(
+                                            () ->
+                                                    given().get(
+                                                                    http
+                                                                            + 
container.getHost()
+                                                                            + 
colon
+                                                                            + 
task._1()
+                                                                            + 
task._2()
+                                                                            + 
RestConstant
+                                                                               
     .FINISHED_JOBS_INFO
+                                                                            + 
"/SAVEPOINT_DONE")
+                                                            .then()
+                                                            .statusCode(200)
+                                                            .body(
+                                                                    "[" + 
i.get() + "].jobId",
+                                                                    
equalTo(jobId)));
+
+                            String jobId2 =
+                                    submitHoconJob(
+                                                    container,
+                                                    task._1(),
+                                                    task._2(),
+                                                    "STREAMING",
+                                                    hoconJobName,
+                                                    hoconParamJobName)
+                                            .getBody()
+                                            .jsonPath()
+                                            .getString("jobId");
+
+                            Awaitility.await()
+                                    .atMost(2, TimeUnit.MINUTES)
+                                    .untilAsserted(
+                                            () ->
+                                                    given().get(
+                                                                    http
+                                                                            + 
container.getHost()
+                                                                            + 
colon
+                                                                            + 
task._1()
+                                                                            + 
task._2()
+                                                                            + 
RestConstant
+                                                                               
     .RUNNING_JOB_URL
+                                                                            + 
"/"
+                                                                            + 
jobId2)
+                                                            .then()
+                                                            .statusCode(200)
+                                                            .body("jobStatus", 
equalTo("RUNNING")));
+                            parameters =
+                                    "{"
+                                            + "\"jobId\":"
+                                            + jobId2
+                                            + ","
+                                            + "\"isStopWithSavePoint\":false}";
+
+                            given().body(parameters)
+                                    .post(
+                                            http
+                                                    + container.getHost()
+                                                    + colon
+                                                    + task._1()
+                                                    + task._2()
+                                                    + 
RestConstant.STOP_JOB_URL)
+                                    .then()
+                                    .statusCode(200)
+                                    .body("jobId", equalTo(jobId2));
+
+                            Awaitility.await()
+                                    .atMost(2, TimeUnit.MINUTES)
+                                    .untilAsserted(
+                                            () ->
+                                                    given().get(
+                                                                    http
+                                                                            + 
container.getHost()
+                                                                            + 
colon
+                                                                            + 
task._1()
+                                                                            + 
task._2()
+                                                                            + 
RestConstant
+                                                                               
     .FINISHED_JOBS_INFO
+                                                                            + 
"/CANCELED")
+                                                            .then()
+                                                            .statusCode(200)
+                                                            .body(
+                                                                    "[" + 
i.get() + "].jobId",
+                                                                    
equalTo(jobId2)));
+
+                            i.getAndIncrement();
+                        });
+    }
+
     private void submitJobs(
             String jobMode,
             GenericContainer<?> container,
@@ -1015,4 +1442,111 @@ public class ClusterSeaTunnelContainer extends 
SeaTunnelContainer {
                 + contextPath
                 + RestConstant.FINISHED_JOBS_INFO;
     }
+
+    private Response submitHoconJob(
+            GenericContainer<?> container,
+            int port,
+            String contextPath,
+            String jobMode,
+            String jobName,
+            String paramJobName) {
+        return submitHoconJob(jobMode, container, port, contextPath, false, 
jobName, paramJobName);
+    }
+
+    private Response submitHoconJob(
+            String jobMode,
+            GenericContainer<?> container,
+            int port,
+            String contextPath,
+            boolean isStartWithSavePoint,
+            String jobName,
+            String paramJobName) {
+        String requestBody =
+                String.format(
+                        "env {\n"
+                                + "  job.name = \"%s\"\n"
+                                + "  job.mode = \"%s\"\n"
+                                + "}\n\n"
+                                + "source {\n"
+                                + "  FakeSource {\n"
+                                + "    result_table_name = \"fake\"\n"
+                                + "    schema = {\n"
+                                + "      fields {\n"
+                                + "        name = \"string\"\n"
+                                + "        age = \"int\"\n"
+                                + "        card = \"int\"\n"
+                                + "      }\n"
+                                + "    }\n"
+                                + "  }\n"
+                                + "}\n\n"
+                                + "transform {\n"
+                                + "}\n\n"
+                                + "sink {\n"
+                                + "  Console {\n"
+                                + "    source_table_name = \"fake\"\n"
+                                + "  }\n"
+                                + "}\n",
+                        jobName, jobMode);
+        String parameters = null;
+        if (paramJobName != null) {
+            parameters = "jobName=" + paramJobName;
+        }
+        if (isStartWithSavePoint) {
+            parameters = parameters + "&isStartWithSavePoint=true";
+        }
+        parameters = parameters + "&format=hocon";
+        Response response =
+                given().body(requestBody)
+                        .header("Content-Type", "text/plain; charset=utf-8")
+                        .post(
+                                parameters == null
+                                        ? http
+                                                + container.getHost()
+                                                + colon
+                                                + port
+                                                + contextPath
+                                                + RestConstant.SUBMIT_JOB_URL
+                                        : http
+                                                + container.getHost()
+                                                + colon
+                                                + port
+                                                + contextPath
+                                                + RestConstant.SUBMIT_JOB_URL
+                                                + "?"
+                                                + parameters);
+        return response;
+    }
+
+    private void submitHoconJobAndAssertResponse(
+            GenericContainer<? extends GenericContainer<?>> container,
+            int port,
+            String contextPath,
+            AtomicInteger i,
+            String customParam,
+            boolean isCustomJobId,
+            String customJobId) {
+        Response response = submitHoconJobAndResponse(container, port, 
contextPath, i, customParam);
+        String jobId = response.getBody().jsonPath().getString("jobId");
+        assertResponse(container, port, contextPath, i, jobId, customJobId, 
isCustomJobId);
+        i.getAndIncrement();
+    }
+
+    private Response submitHoconJobAndResponse(
+            GenericContainer<? extends GenericContainer<?>> container,
+            int port,
+            String contextPath,
+            AtomicInteger i,
+            String customParam) {
+        Response response =
+                i.get() == 0
+                        ? submitHoconJob(
+                                container, port, contextPath, "BATCH", 
hoconJobName, customParam)
+                        : submitHoconJob(container, port, contextPath, 
"BATCH", hoconJobName, null);
+        if (i.get() == 0) {
+            response.then().statusCode(200).body("jobName", 
equalTo(hoconParamJobName));
+        } else {
+            response.then().statusCode(200).body("jobName", 
equalTo(hoconJobName));
+        }
+        return response;
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
index e8c39da7f2..8a4b6e0261 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
@@ -27,6 +27,8 @@ public class RestConstant {
 
     public static final String IS_STOP_WITH_SAVE_POINT = "isStopWithSavePoint";
 
+    public static final String CONFIG_FORMAT = "format";
+
     public static final String JOB_STATUS = "jobStatus";
 
     public static final String CREATE_TIME = "createTime";
@@ -45,6 +47,8 @@ public class RestConstant {
 
     public static final String METRICS = "metrics";
 
+    public static final String HOCON = "hocon";
+
     public static final String TABLE_SOURCE_RECEIVED_COUNT = 
"TableSourceReceivedCount";
     public static final String TABLE_SINK_WRITE_COUNT = "TableSinkWriteCount";
     public static final String TABLE_SOURCE_RECEIVED_QPS = 
"TableSourceReceivedQPS";
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
index 2c497510ca..7d21c2023c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.rest.service;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
 import org.apache.seatunnel.api.common.metrics.JobMetrics;
 import org.apache.seatunnel.common.utils.JsonUtils;
@@ -37,11 +38,15 @@ import com.hazelcast.map.IMap;
 import com.hazelcast.spi.impl.NodeEngineImpl;
 import scala.Tuple2;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.seatunnel.engine.server.rest.RestConstant.CONFIG_FORMAT;
+import static org.apache.seatunnel.engine.server.rest.RestConstant.HOCON;
+
 public class JobInfoService extends BaseService {
 
     public JobInfoService(NodeEngineImpl nodeEngine) {
@@ -157,9 +162,13 @@ public class JobInfoService extends BaseService {
                 && requestParams.get(RestConstant.JOB_ID) == null) {
             throw new IllegalArgumentException("Please provide jobId when 
start with save point.");
         }
-
-        Config config = RestUtil.buildConfig(requestHandle(requestBody), 
false);
-
+        Config config;
+        if (HOCON.equalsIgnoreCase(requestParams.get(CONFIG_FORMAT))) {
+            String requestBodyStr = new String(requestBody, 
StandardCharsets.UTF_8);
+            config = ConfigFactory.parseString(requestBodyStr);
+        } else {
+            config = RestUtil.buildConfig(requestHandle(requestBody), false);
+        }
         SeaTunnelServer seaTunnelServer = getSeaTunnelServer(false);
         return submitJobInternal(config, requestParams, seaTunnelServer, 
nodeEngine.getNode());
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
index b37883f7c3..ba9727dab7 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
@@ -116,6 +116,20 @@ public class BaseServlet extends HttpServlet {
         return requestBody.getBytes(StandardCharsets.UTF_8);
     }
 
+    public byte[] requestHoconBody(HttpServletRequest req) throws IOException {
+        StringBuilder stringBuilder = new StringBuilder();
+        String line;
+
+        try (BufferedReader reader = req.getReader()) {
+            while ((line = reader.readLine()) != null) {
+                stringBuilder.append(line).append("\n");
+            }
+        }
+
+        String requestBody = stringBuilder.toString();
+        return requestBody.getBytes(StandardCharsets.UTF_8);
+    }
+
     protected Map<String, String> getParameterMap(HttpServletRequest req) {
         Map<String, String> reqParameterMap = new HashMap<>();
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobServlet.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobServlet.java
index b4e61c959c..31b9d60721 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobServlet.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobServlet.java
@@ -27,6 +27,9 @@ import javax.servlet.http.HttpServletResponse;
 import java.io.IOException;
 import java.util.Map;
 
+import static 
org.apache.seatunnel.engine.server.rest.RestConstant.CONFIG_FORMAT;
+import static org.apache.seatunnel.engine.server.rest.RestConstant.HOCON;
+
 public class SubmitJobServlet extends BaseServlet {
     private final JobInfoService jobInfoService;
 
@@ -39,7 +42,10 @@ public class SubmitJobServlet extends BaseServlet {
     public void doPost(HttpServletRequest req, HttpServletResponse resp) 
throws IOException {
 
         Map<String, String> requestParams = getParameterMap(req);
-
-        writeJson(resp, jobInfoService.submitJob(requestParams, 
requestBody(req)));
+        if (HOCON.equalsIgnoreCase(requestParams.get(CONFIG_FORMAT))) {
+            writeJson(resp, jobInfoService.submitJob(requestParams, 
requestHoconBody(req)));
+        } else {
+            writeJson(resp, jobInfoService.submitJob(requestParams, 
requestBody(req)));
+        }
     }
 }


Reply via email to