This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1d9a8b3fcd [Feature] [rest-api] Support Rest Api to upload file and
submit task (#8442)
1d9a8b3fcd is described below
commit 1d9a8b3fcd9ebb7aec6e24f8b2a34ba797f97ac1
Author: fcb-xiaobo <[email protected]>
AuthorDate: Fri Jan 10 09:14:37 2025 +0800
[Feature] [rest-api] Support Rest Api to upload file and submit task
(#8442)
---
docs/en/seatunnel-engine/rest-api-v2.md | 34 ++++++++++++
docs/zh/seatunnel-engine/rest-api-v2.md | 33 +++++++++++
.../e2e/ClusterSeaTunnelEngineContainer.java | 30 ++++++++++
.../resources/upload-file/fake_to_console.conf | 48 ++++++++++++++++
.../resources/upload-file/fake_to_console.json | 27 +++++++++
.../seatunnel/engine/server/JettyService.java | 11 +++-
.../seatunnel/engine/server/rest/RestConstant.java | 3 +
.../engine/server/rest/service/JobInfoService.java | 9 +++
.../rest/servlet/SubmitJobByUploadFileServlet.java | 64 ++++++++++++++++++++++
9 files changed, 258 insertions(+), 1 deletion(-)
diff --git a/docs/en/seatunnel-engine/rest-api-v2.md
b/docs/en/seatunnel-engine/rest-api-v2.md
index 27ab552f2d..01dc947911 100644
--- a/docs/en/seatunnel-engine/rest-api-v2.md
+++ b/docs/en/seatunnel-engine/rest-api-v2.md
@@ -483,6 +483,40 @@ sink {
------------------------------------------------------------------------------------------
+### Submit A Job By Upload Config File
+
+<details>
+<summary><code>POST</code> <code><b>/submit-job/upload</b></code>
<code>(Returns jobId and jobName if job submitted
successfully.)</code></summary>
+
+#### Parameters
+
+> | name | type | data type | description
|
+>
|----------------------|----------|-----------|-----------------------------------|
+> | jobId | optional | string | job id
|
+> | jobName | optional | string | job name
|
+> | isStartWithSavePoint | optional | string | if job is started with save
point |
+
+#### Request Body
+The name of the uploaded file key is config_file, and the file suffix json is
parsed in json format. The conf or config file suffix is parsed in hocon format
+
+curl Example :
+```
+curl --location 'http://127.0.0.1:8080/submit-job/upload' --form
'config_file=@"/temp/fake_to_console.conf"'
+
+```
+#### Responses
+
+```json
+{
+ "jobId": 733584788375666689,
+ "jobName": "SeaTunnel_Job"
+}
+```
+
+</details>
+
+------------------------------------------------------------------------------------------
+
### Batch Submit Jobs
<details>
diff --git a/docs/zh/seatunnel-engine/rest-api-v2.md
b/docs/zh/seatunnel-engine/rest-api-v2.md
index 7a3679b216..bf118fe1c1 100644
--- a/docs/zh/seatunnel-engine/rest-api-v2.md
+++ b/docs/zh/seatunnel-engine/rest-api-v2.md
@@ -463,7 +463,40 @@ sink {
</details>
------------------------------------------------------------------------------------------
+### 提交作业来源上传配置文件
+<details>
+<summary><code>POST</code> <code><b>/submit-job</b></code>
<code>(如果作业提交成功,返回jobId和jobName。)</code></summary>
+
+#### 参数
+
+> | 参数名称 | 是否必传 | 参数类型 | 参数描述
|
+>
|----------------------|----------|-----------------------------------|-----------------------------------|
+> | jobId | optional | string | job id
|
+> | jobName | optional | string | job name
|
+> | isStartWithSavePoint | optional | string | if job is started with save
point |
+
+#### 请求体
+上传文件key的名称是config_file,文件后缀json的按照json格式来解析,conf或config文件后缀按照hocon格式解析
+
+curl Example
+
+```
+curl --location 'http://127.0.0.1:8080/submit-job/upload' --form
'config_file=@"/temp/fake_to_console.conf"'
+
+```
+#### 响应
+
+```json
+{
+ "jobId": 733584788375666689,
+ "jobName": "SeaTunnel_Job"
+}
+```
+
+</details>
+
+------------------------------------------------------------------------------------------
### 批量提交作业
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelEngineContainer.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelEngineContainer.java
index 671249f28c..718d725d91 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelEngineContainer.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelEngineContainer.java
@@ -37,7 +37,9 @@ import com.hazelcast.jet.json.JsonUtil;
import io.restassured.response.Response;
import scala.Tuple3;
+import java.io.File;
import java.io.IOException;
+import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
@@ -253,6 +255,34 @@ public class ClusterSeaTunnelEngineContainer extends
SeaTunnelEngineContainer {
});
}
+ @Test
+ public void testRestApiSubmitJobByUploadFileV2() {
+ Arrays.asList(server, secondServer)
+ .forEach(
+ container -> {
+ Tuple3<Integer, String, Long> task = tasks.get(1);
+ URL resource =
+
this.getClass().getClassLoader().getResource("upload-file");
+ File fileDirect = new File(resource.getFile());
+ File[] files = fileDirect.listFiles();
+ for (File file : files) {
+ Response response =
+ given().multiPart("config_file", file)
+ .baseUri(
+ http
+ +
container.getHost()
+ + colon
+ + task._1())
+ .basePath(
+ RestConstant
+
.REST_URL_SUBMIT_JOB_BY_UPLOAD_FILE)
+ .when()
+ .post();
+ Assertions.assertEquals(200,
response.getStatusCode());
+ }
+ });
+ }
+
@Test
public void testStopJob() {
AtomicInteger i = new AtomicInteger();
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/upload-file/fake_to_console.conf
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/upload-file/fake_to_console.conf
new file mode 100644
index 0000000000..393c5c2e52
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/upload-file/fake_to_console.conf
@@ -0,0 +1,48 @@
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ plugin_output = "fake"
+ parallelism = 1
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ console {
+ plugin_input="fake"
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/upload-file/fake_to_console.json
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/upload-file/fake_to_console.json
new file mode 100644
index 0000000000..73ab4447e7
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/upload-file/fake_to_console.json
@@ -0,0 +1,27 @@
+{
+ "env": {
+ "job.mode": "batch"
+ },
+ "source": [
+ {
+ "plugin_name": "FakeSource",
+ "plugin_output": "fake",
+ "row.num": 100,
+ "schema": {
+ "fields": {
+ "name": "string",
+ "age": "int",
+ "card": "int"
+ }
+ }
+ }
+ ],
+ "transform": [
+ ],
+ "sink": [
+ {
+ "plugin_name": "Console",
+ "plugin_input": ["fake"]
+ }
+ ]
+}
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java
index 8af15a3325..4d9b75abf5 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java
@@ -37,6 +37,7 @@ import
org.apache.seatunnel.engine.server.rest.servlet.RunningJobsServlet;
import org.apache.seatunnel.engine.server.rest.servlet.RunningThreadsServlet;
import org.apache.seatunnel.engine.server.rest.servlet.StopJobServlet;
import org.apache.seatunnel.engine.server.rest.servlet.StopJobsServlet;
+import
org.apache.seatunnel.engine.server.rest.servlet.SubmitJobByUploadFileServlet;
import org.apache.seatunnel.engine.server.rest.servlet.SubmitJobServlet;
import org.apache.seatunnel.engine.server.rest.servlet.SubmitJobsServlet;
import org.apache.seatunnel.engine.server.rest.servlet.SystemMonitoringServlet;
@@ -47,6 +48,7 @@ import com.hazelcast.spi.impl.NodeEngineImpl;
import lombok.extern.slf4j.Slf4j;
import javax.servlet.DispatcherType;
+import javax.servlet.MultipartConfigElement;
import java.io.IOException;
import java.net.DatagramSocket;
@@ -70,6 +72,7 @@ import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_STOP
import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_STOP_JOBS;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_SUBMIT_JOB;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_SUBMIT_JOBS;
+import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_SUBMIT_JOB_BY_UPLOAD_FILE;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_SYSTEM_MONITORING_INFORMATION;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_THREAD_DUMP;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_UPDATE_TAGS;
@@ -122,6 +125,9 @@ public class JettyService {
ServletHolder threadDumpHolder = new ServletHolder(new
ThreadDumpServlet(nodeEngine));
ServletHolder submitJobHolder = new ServletHolder(new
SubmitJobServlet(nodeEngine));
+ ServletHolder submitJobByUploadFileHolder =
+ new ServletHolder(new
SubmitJobByUploadFileServlet(nodeEngine));
+
ServletHolder submitJobsHolder = new ServletHolder(new
SubmitJobsServlet(nodeEngine));
ServletHolder stopJobHolder = new ServletHolder(new
StopJobServlet(nodeEngine));
ServletHolder stopJobsHolder = new ServletHolder(new
StopJobsServlet(nodeEngine));
@@ -147,7 +153,10 @@ public class JettyService {
context.addServlet(jobInfoHolder, convertUrlToPath(REST_URL_JOB_INFO));
context.addServlet(jobInfoHolder,
convertUrlToPath(REST_URL_RUNNING_JOB));
context.addServlet(threadDumpHolder,
convertUrlToPath(REST_URL_THREAD_DUMP));
-
+ MultipartConfigElement multipartConfigElement = new
MultipartConfigElement("");
+
submitJobByUploadFileHolder.getRegistration().setMultipartConfig(multipartConfigElement);
+ context.addServlet(
+ submitJobByUploadFileHolder,
convertUrlToPath(REST_URL_SUBMIT_JOB_BY_UPLOAD_FILE));
context.addServlet(submitJobHolder,
convertUrlToPath(REST_URL_SUBMIT_JOB));
context.addServlet(submitJobsHolder,
convertUrlToPath(REST_URL_SUBMIT_JOBS));
context.addServlet(stopJobHolder, convertUrlToPath(REST_URL_STOP_JOB));
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
index e55f76cf54..810e08453e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
@@ -75,6 +75,9 @@ public class RestConstant {
public static final String REST_URL_SYSTEM_MONITORING_INFORMATION =
"/system-monitoring-information";
public static final String REST_URL_SUBMIT_JOB = "/submit-job";
+
+ public static final String REST_URL_SUBMIT_JOB_BY_UPLOAD_FILE =
"/submit-job/upload";
+
public static final String REST_URL_SUBMIT_JOBS = "/submit-jobs";
public static final String REST_URL_STOP_JOB = "/stop-job";
public static final String REST_URL_STOP_JOBS = "/stop-jobs";
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
index 7d21c2023c..22d3138aee 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
@@ -173,6 +173,15 @@ public class JobInfoService extends BaseService {
return submitJobInternal(config, requestParams, seaTunnelServer,
nodeEngine.getNode());
}
+ public JsonObject submitJob(Map<String, String> requestParams, Config
config) {
+ if
(Boolean.parseBoolean(requestParams.get(RestConstant.IS_START_WITH_SAVE_POINT))
+ && requestParams.get(RestConstant.JOB_ID) == null) {
+ throw new IllegalArgumentException("Please provide jobId when
start with save point.");
+ }
+ SeaTunnelServer seaTunnelServer = getSeaTunnelServer(false);
+ return submitJobInternal(config, requestParams, seaTunnelServer,
nodeEngine.getNode());
+ }
+
public JsonArray submitJobs(byte[] requestBody) {
List<Tuple2<Map<String, String>, Config>> configTuples =
RestUtil.buildConfigList(requestHandle(requestBody), false);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobByUploadFileServlet.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobByUploadFileServlet.java
new file mode 100644
index 0000000000..2db376da06
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobByUploadFileServlet.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.rest.servlet;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigParseOptions;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigSyntax;
+
+import org.apache.seatunnel.engine.server.rest.service.JobInfoService;
+
+import org.apache.commons.io.IOUtils;
+
+import com.hazelcast.spi.impl.NodeEngineImpl;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.Part;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+public class SubmitJobByUploadFileServlet extends BaseServlet {
+ private final JobInfoService jobInfoService;
+
+ public SubmitJobByUploadFileServlet(NodeEngineImpl nodeEngine) {
+ super(nodeEngine);
+ this.jobInfoService = new JobInfoService(nodeEngine);
+ }
+
+ @Override
+ public void doPost(HttpServletRequest req, HttpServletResponse resp)
+ throws IOException, ServletException {
+
+ Part filePart = req.getPart("config_file");
+ String submittedFileName = filePart.getSubmittedFileName();
+ String content = IOUtils.toString(filePart.getInputStream(),
StandardCharsets.UTF_8);
+ Config config;
+ if (submittedFileName.endsWith(".json")) {
+ config =
+ ConfigFactory.parseString(
+ content,
ConfigParseOptions.defaults().setSyntax(ConfigSyntax.JSON));
+ } else {
+ config = ConfigFactory.parseString(content);
+ }
+ writeJson(resp, jobInfoService.submitJob(getParameterMap(req),
config));
+ }
+}