This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1d9a8b3fcd [Feature] [rest-api]  Support Rest Api to upload file and 
submit task (#8442)
1d9a8b3fcd is described below

commit 1d9a8b3fcd9ebb7aec6e24f8b2a34ba797f97ac1
Author: fcb-xiaobo <[email protected]>
AuthorDate: Fri Jan 10 09:14:37 2025 +0800

    [Feature] [rest-api]  Support Rest Api to upload file and submit task 
(#8442)
---
 docs/en/seatunnel-engine/rest-api-v2.md            | 34 ++++++++++++
 docs/zh/seatunnel-engine/rest-api-v2.md            | 33 +++++++++++
 .../e2e/ClusterSeaTunnelEngineContainer.java       | 30 ++++++++++
 .../resources/upload-file/fake_to_console.conf     | 48 ++++++++++++++++
 .../resources/upload-file/fake_to_console.json     | 27 +++++++++
 .../seatunnel/engine/server/JettyService.java      | 11 +++-
 .../seatunnel/engine/server/rest/RestConstant.java |  3 +
 .../engine/server/rest/service/JobInfoService.java |  9 +++
 .../rest/servlet/SubmitJobByUploadFileServlet.java | 64 ++++++++++++++++++++++
 9 files changed, 258 insertions(+), 1 deletion(-)

diff --git a/docs/en/seatunnel-engine/rest-api-v2.md 
b/docs/en/seatunnel-engine/rest-api-v2.md
index 27ab552f2d..01dc947911 100644
--- a/docs/en/seatunnel-engine/rest-api-v2.md
+++ b/docs/en/seatunnel-engine/rest-api-v2.md
@@ -483,6 +483,40 @@ sink {
 
 
------------------------------------------------------------------------------------------
 
+### Submit A Job By Upload Config File
+
+<details>
+<summary><code>POST</code> <code><b>/submit-job/upload</b></code> 
<code>(Returns jobId and jobName if job submitted 
successfully.)</code></summary>
+
+#### Parameters
+
+> | name                 |   type   | data type |            description       
     |
+> 
|----------------------|----------|-----------|-----------------------------------|
+> | jobId                | optional | string    | job id                       
     |
+> | jobName              | optional | string    | job name                     
     |
+> | isStartWithSavePoint | optional | string    | if job is started with save 
point |
+
+#### Request Body
+The name of the uploaded file key is config_file, and the file suffix json is 
parsed in json format. The conf or config file suffix is parsed in hocon format
+
+curl Example :
+```
+curl --location 'http://127.0.0.1:8080/submit-job/upload' --form 
'config_file=@"/temp/fake_to_console.conf"'
+
+```
+#### Responses
+
+```json
+{
+    "jobId": 733584788375666689,
+    "jobName": "SeaTunnel_Job"
+}
+```
+
+</details>
+
+------------------------------------------------------------------------------------------
+
 ### Batch Submit Jobs
 
 <details>
diff --git a/docs/zh/seatunnel-engine/rest-api-v2.md 
b/docs/zh/seatunnel-engine/rest-api-v2.md
index 7a3679b216..bf118fe1c1 100644
--- a/docs/zh/seatunnel-engine/rest-api-v2.md
+++ b/docs/zh/seatunnel-engine/rest-api-v2.md
@@ -463,7 +463,40 @@ sink {
 </details>
 
 
------------------------------------------------------------------------------------------
+### 提交作业来源上传配置文件
 
+<details>
+<summary><code>POST</code> <code><b>/submit-job</b></code> 
<code>(如果作业提交成功,返回jobId和jobName。)</code></summary>
+
+#### 参数
+
+> |         参数名称         |   是否必传   |  参数类型  | 参数描述                            
  |
+> 
|----------------------|----------|-----------------------------------|-----------------------------------|
+> | jobId                | optional | string | job id                          
  |
+> | jobName              | optional | string | job name                        
  |
+> | isStartWithSavePoint | optional | string | if job is started with save 
point |
+
+#### 请求体
+上传文件key的名称是config_file,文件后缀json的按照json格式来解析,conf或config文件后缀按照hocon格式解析
+
+curl Example
+
+```
+curl --location 'http://127.0.0.1:8080/submit-job/upload' --form 
'config_file=@"/temp/fake_to_console.conf"'
+
+```
+#### 响应
+
+```json
+{
+    "jobId": 733584788375666689,
+    "jobName": "SeaTunnel_Job"
+}
+```
+
+</details>
+
+------------------------------------------------------------------------------------------
 
 ### 批量提交作业
 
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelEngineContainer.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelEngineContainer.java
index 671249f28c..718d725d91 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelEngineContainer.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelEngineContainer.java
@@ -37,7 +37,9 @@ import com.hazelcast.jet.json.JsonUtil;
 import io.restassured.response.Response;
 import scala.Tuple3;
 
+import java.io.File;
 import java.io.IOException;
+import java.net.URL;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
@@ -253,6 +255,34 @@ public class ClusterSeaTunnelEngineContainer extends 
SeaTunnelEngineContainer {
                         });
     }
 
+    @Test
+    public void testRestApiSubmitJobByUploadFileV2() {
+        Arrays.asList(server, secondServer)
+                .forEach(
+                        container -> {
+                            Tuple3<Integer, String, Long> task = tasks.get(1);
+                            URL resource =
+                                    
this.getClass().getClassLoader().getResource("upload-file");
+                            File fileDirect = new File(resource.getFile());
+                            File[] files = fileDirect.listFiles();
+                            for (File file : files) {
+                                Response response =
+                                        given().multiPart("config_file", file)
+                                                .baseUri(
+                                                        http
+                                                                + 
container.getHost()
+                                                                + colon
+                                                                + task._1())
+                                                .basePath(
+                                                        RestConstant
+                                                                
.REST_URL_SUBMIT_JOB_BY_UPLOAD_FILE)
+                                                .when()
+                                                .post();
+                                Assertions.assertEquals(200, 
response.getStatusCode());
+                            }
+                        });
+    }
+
     @Test
     public void testStopJob() {
         AtomicInteger i = new AtomicInteger();
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/upload-file/fake_to_console.conf
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/upload-file/fake_to_console.conf
new file mode 100644
index 0000000000..393c5c2e52
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/upload-file/fake_to_console.conf
@@ -0,0 +1,48 @@
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    plugin_output = "fake"
+    parallelism = 1
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  console {
+    plugin_input="fake"
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/upload-file/fake_to_console.json
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/upload-file/fake_to_console.json
new file mode 100644
index 0000000000..73ab4447e7
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/upload-file/fake_to_console.json
@@ -0,0 +1,27 @@
+{
+    "env": {
+        "job.mode": "batch"
+    },
+    "source": [
+        {
+            "plugin_name": "FakeSource",
+            "plugin_output": "fake",
+            "row.num": 100,
+            "schema": {
+                "fields": {
+                    "name": "string",
+                    "age": "int",
+                    "card": "int"
+                }
+            }
+        }
+    ],
+    "transform": [
+    ],
+    "sink": [
+        {
+            "plugin_name": "Console",
+            "plugin_input": ["fake"]
+        }
+    ]
+}
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java
index 8af15a3325..4d9b75abf5 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java
@@ -37,6 +37,7 @@ import 
org.apache.seatunnel.engine.server.rest.servlet.RunningJobsServlet;
 import org.apache.seatunnel.engine.server.rest.servlet.RunningThreadsServlet;
 import org.apache.seatunnel.engine.server.rest.servlet.StopJobServlet;
 import org.apache.seatunnel.engine.server.rest.servlet.StopJobsServlet;
+import 
org.apache.seatunnel.engine.server.rest.servlet.SubmitJobByUploadFileServlet;
 import org.apache.seatunnel.engine.server.rest.servlet.SubmitJobServlet;
 import org.apache.seatunnel.engine.server.rest.servlet.SubmitJobsServlet;
 import org.apache.seatunnel.engine.server.rest.servlet.SystemMonitoringServlet;
@@ -47,6 +48,7 @@ import com.hazelcast.spi.impl.NodeEngineImpl;
 import lombok.extern.slf4j.Slf4j;
 
 import javax.servlet.DispatcherType;
+import javax.servlet.MultipartConfigElement;
 
 import java.io.IOException;
 import java.net.DatagramSocket;
@@ -70,6 +72,7 @@ import static 
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_STOP
 import static 
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_STOP_JOBS;
 import static 
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_SUBMIT_JOB;
 import static 
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_SUBMIT_JOBS;
+import static 
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_SUBMIT_JOB_BY_UPLOAD_FILE;
 import static 
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_SYSTEM_MONITORING_INFORMATION;
 import static 
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_THREAD_DUMP;
 import static 
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_UPDATE_TAGS;
@@ -122,6 +125,9 @@ public class JettyService {
         ServletHolder threadDumpHolder = new ServletHolder(new 
ThreadDumpServlet(nodeEngine));
 
         ServletHolder submitJobHolder = new ServletHolder(new 
SubmitJobServlet(nodeEngine));
+        ServletHolder submitJobByUploadFileHolder =
+                new ServletHolder(new 
SubmitJobByUploadFileServlet(nodeEngine));
+
         ServletHolder submitJobsHolder = new ServletHolder(new 
SubmitJobsServlet(nodeEngine));
         ServletHolder stopJobHolder = new ServletHolder(new 
StopJobServlet(nodeEngine));
         ServletHolder stopJobsHolder = new ServletHolder(new 
StopJobsServlet(nodeEngine));
@@ -147,7 +153,10 @@ public class JettyService {
         context.addServlet(jobInfoHolder, convertUrlToPath(REST_URL_JOB_INFO));
         context.addServlet(jobInfoHolder, 
convertUrlToPath(REST_URL_RUNNING_JOB));
         context.addServlet(threadDumpHolder, 
convertUrlToPath(REST_URL_THREAD_DUMP));
-
+        MultipartConfigElement multipartConfigElement = new 
MultipartConfigElement("");
+        
submitJobByUploadFileHolder.getRegistration().setMultipartConfig(multipartConfigElement);
+        context.addServlet(
+                submitJobByUploadFileHolder, 
convertUrlToPath(REST_URL_SUBMIT_JOB_BY_UPLOAD_FILE));
         context.addServlet(submitJobHolder, 
convertUrlToPath(REST_URL_SUBMIT_JOB));
         context.addServlet(submitJobsHolder, 
convertUrlToPath(REST_URL_SUBMIT_JOBS));
         context.addServlet(stopJobHolder, convertUrlToPath(REST_URL_STOP_JOB));
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
index e55f76cf54..810e08453e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
@@ -75,6 +75,9 @@ public class RestConstant {
     public static final String REST_URL_SYSTEM_MONITORING_INFORMATION =
             "/system-monitoring-information";
     public static final String REST_URL_SUBMIT_JOB = "/submit-job";
+
+    public static final String REST_URL_SUBMIT_JOB_BY_UPLOAD_FILE = 
"/submit-job/upload";
+
     public static final String REST_URL_SUBMIT_JOBS = "/submit-jobs";
     public static final String REST_URL_STOP_JOB = "/stop-job";
     public static final String REST_URL_STOP_JOBS = "/stop-jobs";
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
index 7d21c2023c..22d3138aee 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
@@ -173,6 +173,15 @@ public class JobInfoService extends BaseService {
         return submitJobInternal(config, requestParams, seaTunnelServer, 
nodeEngine.getNode());
     }
 
+    public JsonObject submitJob(Map<String, String> requestParams, Config 
config) {
+        if 
(Boolean.parseBoolean(requestParams.get(RestConstant.IS_START_WITH_SAVE_POINT))
+                && requestParams.get(RestConstant.JOB_ID) == null) {
+            throw new IllegalArgumentException("Please provide jobId when 
start with save point.");
+        }
+        SeaTunnelServer seaTunnelServer = getSeaTunnelServer(false);
+        return submitJobInternal(config, requestParams, seaTunnelServer, 
nodeEngine.getNode());
+    }
+
     public JsonArray submitJobs(byte[] requestBody) {
         List<Tuple2<Map<String, String>, Config>> configTuples =
                 RestUtil.buildConfigList(requestHandle(requestBody), false);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobByUploadFileServlet.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobByUploadFileServlet.java
new file mode 100644
index 0000000000..2db376da06
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobByUploadFileServlet.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.rest.servlet;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigParseOptions;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigSyntax;
+
+import org.apache.seatunnel.engine.server.rest.service.JobInfoService;
+
+import org.apache.commons.io.IOUtils;
+
+import com.hazelcast.spi.impl.NodeEngineImpl;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.Part;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+public class SubmitJobByUploadFileServlet extends BaseServlet {
+    private final JobInfoService jobInfoService;
+
+    public SubmitJobByUploadFileServlet(NodeEngineImpl nodeEngine) {
+        super(nodeEngine);
+        this.jobInfoService = new JobInfoService(nodeEngine);
+    }
+
+    @Override
+    public void doPost(HttpServletRequest req, HttpServletResponse resp)
+            throws IOException, ServletException {
+
+        Part filePart = req.getPart("config_file");
+        String submittedFileName = filePart.getSubmittedFileName();
+        String content = IOUtils.toString(filePart.getInputStream(), 
StandardCharsets.UTF_8);
+        Config config;
+        if (submittedFileName.endsWith(".json")) {
+            config =
+                    ConfigFactory.parseString(
+                            content, 
ConfigParseOptions.defaults().setSyntax(ConfigSyntax.JSON));
+        } else {
+            config = ConfigFactory.parseString(content);
+        }
+        writeJson(resp, jobInfoService.submitJob(getParameterMap(req), 
config));
+    }
+}

Reply via email to