This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new dd2fe00761 [Feature][Zeta][REST-API]Add REST API To Submit Job (#5107)
dd2fe00761 is described below

commit dd2fe0076196f4bfb24b702889456791f788c500
Author: fang <[email protected]>
AuthorDate: Tue Aug 8 11:56:15 2023 +0800

    [Feature][Zeta][REST-API]Add REST API To Submit Job (#5107)
---
 docs/en/seatunnel-engine/rest-api.md               |  58 +++++++++
 .../core/starter/utils/ConfigBuilder.java          |   6 +
 .../org/apache/seatunnel/engine/e2e/RestApiIT.java |  72 +++++++++++
 .../engine/client/job/JobExecutionEnvironment.java |  91 +-------------
 .../engine/core/job/AbstractJobEnvironment.java}   |  78 +++---------
 .../core/parse/MultipleTableJobConfigParser.java   |  16 +++
 .../seatunnel/engine/server/NodeExtension.java     |   2 +
 .../server/job/JobImmutableInformationEnv.java     |  80 ++++++++++++
 .../seatunnel/engine/server/rest/RestConstant.java |   1 +
 .../server/rest/RestHttpPostCommandProcessor.java  | 135 +++++++++++++++++++++
 .../seatunnel/engine/server/utils/RestUtil.java    |  65 ++++++++++
 11 files changed, 456 insertions(+), 148 deletions(-)

diff --git a/docs/en/seatunnel-engine/rest-api.md 
b/docs/en/seatunnel-engine/rest-api.md
index 2edec3496a..2f44421a3d 100644
--- a/docs/en/seatunnel-engine/rest-api.md
+++ b/docs/en/seatunnel-engine/rest-api.md
@@ -180,3 +180,61 @@ network:
 
 
------------------------------------------------------------------------------------------
 
+### Submit Job.
+
+<details>
+<summary><code>POST</code> <code><b>/hazelcast/rest/maps/submit-job</b></code> 
<code>(Returns jobId and jobName if job submitted 
successfully.)</code></summary>
+
+#### Parameters
+
+> |         name         |   type   | data type |            description       
     |
+> 
|----------------------|----------|-----------|-----------------------------------|
+> | jobId                | optional | string    | job id                       
     |
+> | jobName              | optional | string    | job name                     
     |
+> | isStartWithSavePoint | optional | string    | if job is started with save 
point |
+
+#### Body
+
+```json
+{
+    "env": {
+        "job.mode": "batch"
+    },
+    "source": [
+        {
+            "plugin_name": "FakeSource",
+            "result_table_name": "fake",
+            "row.num": 100,
+            "schema": {
+                "fields": {
+                    "name": "string",
+                    "age": "int",
+                    "card": "int"
+                }
+            }
+        }
+    ],
+    "transform": [
+    ],
+    "sink": [
+        {
+            "plugin_name": "Console",
+            "source_table_name": ["fake"]
+        }
+    ]
+}
+```
+
+#### Responses
+
+```json
+{
+    "jobId": 733584788375666689,
+    "jobName": "rest_api_test"
+}
+```
+
+</details>
+
+------------------------------------------------------------------------------------------
+
diff --git 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java
 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java
index ed66b550a0..ad063acac8 100644
--- 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java
+++ 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java
@@ -69,6 +69,12 @@ public class ConfigBuilder {
         return config;
     }
 
+    public static Config of(@NonNull Map<String, Object> objectMap) {
+        log.info("Loading config file from objectMap");
+        Config config = ConfigFactory.parseMap(objectMap);
+        return ConfigShadeUtils.decryptConfig(config);
+    }
+
     public static Config of(@NonNull ConfigAdapter configAdapter, @NonNull 
Path filePath) {
         log.info("With config adapter spi {}", 
configAdapter.getClass().getName());
         try {
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index 5f4e97ac8d..d38d1c732f 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -22,10 +22,12 @@ import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.engine.client.SeaTunnelClient;
 import org.apache.seatunnel.engine.client.job.ClientJobProxy;
 import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
+import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
 import org.apache.seatunnel.engine.server.rest.RestConstant;
 
@@ -37,6 +39,7 @@ import org.junit.jupiter.api.Test;
 
 import com.hazelcast.client.config.ClientConfig;
 import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import io.restassured.response.Response;
 import lombok.extern.slf4j.Slf4j;
 
 import java.util.concurrent.TimeUnit;
@@ -131,6 +134,75 @@ public class RestApiIT {
                 .statusCode(200);
     }
 
+    @Test
+    public void testSubmitJob() {
+        String requestBody =
+                "{\n"
+                        + "    \"env\": {\n"
+                        + "        \"job.mode\": \"batch\"\n"
+                        + "    },\n"
+                        + "    \"source\": [\n"
+                        + "        {\n"
+                        + "            \"plugin_name\": \"FakeSource\",\n"
+                        + "            \"result_table_name\": \"fake\",\n"
+                        + "            \"row.num\": 100,\n"
+                        + "            \"schema\": {\n"
+                        + "                \"fields\": {\n"
+                        + "                    \"name\": \"string\",\n"
+                        + "                    \"age\": \"int\",\n"
+                        + "                    \"card\": \"int\"\n"
+                        + "                }\n"
+                        + "            }\n"
+                        + "        }\n"
+                        + "    ],\n"
+                        + "    \"transform\": [\n"
+                        + "    ],\n"
+                        + "    \"sink\": [\n"
+                        + "        {\n"
+                        + "            \"plugin_name\": \"Console\",\n"
+                        + "            \"source_table_name\": [\"fake\"]\n"
+                        + "        }\n"
+                        + "    ]\n"
+                        + "}";
+        String parameters = "jobId=1&jobName=test&isStartWithSavePoint=false";
+        // Only jobName is compared because jobId is randomly generated if 
isStartWithSavePoint is
+        // false
+        Response response =
+                given().body(requestBody)
+                        .post(
+                                HOST
+                                        + hazelcastInstance
+                                                .getCluster()
+                                                .getLocalMember()
+                                                .getAddress()
+                                                .getPort()
+                                        + RestConstant.SUBMIT_JOB_URL
+                                        + "?"
+                                        + parameters);
+
+        response.then().statusCode(200).body("jobName", equalTo("test"));
+        String jobId = response.getBody().jsonPath().getString("jobId");
+        SeaTunnelServer seaTunnelServer =
+                (SeaTunnelServer)
+                        hazelcastInstance
+                                .node
+                                .getNodeExtension()
+                                .createExtensionServices()
+                                .get(Constant.SEATUNNEL_SERVICE_NAME);
+        JobStatus jobStatus =
+                
seaTunnelServer.getCoordinatorService().getJobStatus(Long.parseLong(jobId));
+        Assertions.assertEquals(JobStatus.RUNNING, jobStatus);
+        Awaitility.await()
+                .atMost(2, TimeUnit.MINUTES)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertEquals(
+                                        JobStatus.FINISHED,
+                                        seaTunnelServer
+                                                .getCoordinatorService()
+                                                
.getJobStatus(Long.parseLong(jobId))));
+    }
+
     @AfterAll
     static void afterClass() {
         if (hazelcastInstance != null) {
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
index bf3169e4c8..3f870c6121 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
@@ -18,55 +18,19 @@
 package org.apache.seatunnel.engine.client.job;
 
 import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.env.EnvCommonOptions;
-import org.apache.seatunnel.common.config.Common;
-import org.apache.seatunnel.common.utils.FileUtils;
 import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
 import org.apache.seatunnel.engine.common.config.JobConfig;
-import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
-import org.apache.seatunnel.engine.common.utils.IdGenerator;
-import org.apache.seatunnel.engine.core.dag.actions.Action;
-import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
-import org.apache.seatunnel.engine.core.dag.logical.LogicalDagGenerator;
+import org.apache.seatunnel.engine.core.job.AbstractJobEnvironment;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;
 
-import org.apache.commons.lang3.tuple.ImmutablePair;
-
-import com.hazelcast.logging.ILogger;
-import com.hazelcast.logging.Logger;
-
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Path;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
 import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
-
-public class JobExecutionEnvironment {
-
-    private static final ILogger LOGGER = 
Logger.getLogger(JobExecutionEnvironment.class);
-
-    private final boolean isStartWithSavePoint;
-
-    private final JobConfig jobConfig;
-
-    private final List<Action> actions = new ArrayList<>();
-
-    private final Set<URL> jarUrls = new HashSet<>();
 
-    private final List<URL> commonPluginJars = new ArrayList<>();
+public class JobExecutionEnvironment extends AbstractJobEnvironment {
 
     private final String jobFilePath;
 
-    private final IdGenerator idGenerator;
-
     private final SeaTunnelHazelcastClient seaTunnelHazelcastClient;
 
     private final JobClient jobClient;
@@ -78,35 +42,12 @@ public class JobExecutionEnvironment {
             SeaTunnelHazelcastClient seaTunnelHazelcastClient,
             boolean isStartWithSavePoint,
             Long jobId) {
-        this.jobConfig = jobConfig;
+        super(jobConfig, isStartWithSavePoint);
         this.jobFilePath = jobFilePath;
-        this.idGenerator = new IdGenerator();
         this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
         this.jobClient = new JobClient(seaTunnelHazelcastClient);
-        this.isStartWithSavePoint = isStartWithSavePoint;
         this.jobConfig.setJobContext(
                 new JobContext(isStartWithSavePoint ? jobId : 
jobClient.getNewJobId()));
-        this.commonPluginJars.addAll(searchPluginJars());
-        this.commonPluginJars.addAll(
-                new ArrayList<>(
-                        Common.getThirdPartyJars(
-                                        jobConfig
-                                                .getEnvOptions()
-                                                
.getOrDefault(EnvCommonOptions.JARS.key(), "")
-                                                .toString())
-                                .stream()
-                                .map(Path::toUri)
-                                .map(
-                                        uri -> {
-                                            try {
-                                                return uri.toURL();
-                                            } catch (MalformedURLException e) {
-                                                throw new 
SeaTunnelEngineException(
-                                                        "the uri of jar 
illegal:" + uri, e);
-                                            }
-                                        })
-                                .collect(Collectors.toList())));
-        LOGGER.info("add common jar in plugins :" + commonPluginJars);
     }
 
     public JobExecutionEnvironment(
@@ -117,27 +58,12 @@ public class JobExecutionEnvironment {
     }
 
     /** Search all jars in SEATUNNEL_HOME/plugins */
-    private Set<URL> searchPluginJars() {
-        try {
-            if (Files.exists(Common.pluginRootDir())) {
-                return new 
HashSet<>(FileUtils.searchJarFiles(Common.pluginRootDir()));
-            }
-        } catch (IOException | SeaTunnelEngineException e) {
-            LOGGER.warning(
-                    String.format("Can't search plugin jars in %s.", 
Common.pluginRootDir()), e);
-        }
-        return Collections.emptySet();
-    }
-
-    private MultipleTableJobConfigParser getJobConfigParser() {
+    @Override
+    protected MultipleTableJobConfigParser getJobConfigParser() {
         return new MultipleTableJobConfigParser(
                 jobFilePath, idGenerator, jobConfig, commonPluginJars, 
isStartWithSavePoint);
     }
 
-    private LogicalDagGenerator getLogicalDagGenerator() {
-        return new LogicalDagGenerator(actions, jobConfig, idGenerator);
-    }
-
     public ClientJobProxy execute() throws ExecutionException, 
InterruptedException {
         JobImmutableInformation jobImmutableInformation =
                 new JobImmutableInformation(
@@ -150,11 +76,4 @@ public class JobExecutionEnvironment {
 
         return jobClient.createJobProxy(jobImmutableInformation);
     }
-
-    private LogicalDag getLogicalDag() {
-        ImmutablePair<List<Action>, Set<URL>> immutablePair = 
getJobConfigParser().parse();
-        actions.addAll(immutablePair.getLeft());
-        jarUrls.addAll(immutablePair.getRight());
-        return getLogicalDagGenerator().generate();
-    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java
similarity index 59%
copy from 
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
copy to 
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java
index bf3169e4c8..3509903c08 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java
@@ -15,20 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.client.job;
+package org.apache.seatunnel.engine.core.job;
 
-import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.utils.FileUtils;
-import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDagGenerator;
-import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
@@ -46,46 +43,27 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
-public class JobExecutionEnvironment {
+public abstract class AbstractJobEnvironment {
+    protected static ILogger LOGGER = null;
 
-    private static final ILogger LOGGER = 
Logger.getLogger(JobExecutionEnvironment.class);
+    protected final boolean isStartWithSavePoint;
 
-    private final boolean isStartWithSavePoint;
+    protected final List<Action> actions = new ArrayList<>();
+    protected final Set<URL> jarUrls = new HashSet<>();
 
-    private final JobConfig jobConfig;
+    protected final JobConfig jobConfig;
 
-    private final List<Action> actions = new ArrayList<>();
+    protected final IdGenerator idGenerator;
 
-    private final Set<URL> jarUrls = new HashSet<>();
+    protected final List<URL> commonPluginJars = new ArrayList<>();
 
-    private final List<URL> commonPluginJars = new ArrayList<>();
-
-    private final String jobFilePath;
-
-    private final IdGenerator idGenerator;
-
-    private final SeaTunnelHazelcastClient seaTunnelHazelcastClient;
-
-    private final JobClient jobClient;
-
-    /** If the JobId is not empty, it is used to restore job from savePoint */
-    public JobExecutionEnvironment(
-            JobConfig jobConfig,
-            String jobFilePath,
-            SeaTunnelHazelcastClient seaTunnelHazelcastClient,
-            boolean isStartWithSavePoint,
-            Long jobId) {
+    public AbstractJobEnvironment(JobConfig jobConfig, boolean 
isStartWithSavePoint) {
+        LOGGER = Logger.getLogger(getClass().getName());
         this.jobConfig = jobConfig;
-        this.jobFilePath = jobFilePath;
-        this.idGenerator = new IdGenerator();
-        this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
-        this.jobClient = new JobClient(seaTunnelHazelcastClient);
         this.isStartWithSavePoint = isStartWithSavePoint;
-        this.jobConfig.setJobContext(
-                new JobContext(isStartWithSavePoint ? jobId : 
jobClient.getNewJobId()));
+        this.idGenerator = new IdGenerator();
         this.commonPluginJars.addAll(searchPluginJars());
         this.commonPluginJars.addAll(
                 new ArrayList<>(
@@ -109,15 +87,7 @@ public class JobExecutionEnvironment {
         LOGGER.info("add common jar in plugins :" + commonPluginJars);
     }
 
-    public JobExecutionEnvironment(
-            JobConfig jobConfig,
-            String jobFilePath,
-            SeaTunnelHazelcastClient seaTunnelHazelcastClient) {
-        this(jobConfig, jobFilePath, seaTunnelHazelcastClient, false, null);
-    }
-
-    /** Search all jars in SEATUNNEL_HOME/plugins */
-    private Set<URL> searchPluginJars() {
+    protected Set<URL> searchPluginJars() {
         try {
             if (Files.exists(Common.pluginRootDir())) {
                 return new 
HashSet<>(FileUtils.searchJarFiles(Common.pluginRootDir()));
@@ -129,29 +99,13 @@ public class JobExecutionEnvironment {
         return Collections.emptySet();
     }
 
-    private MultipleTableJobConfigParser getJobConfigParser() {
-        return new MultipleTableJobConfigParser(
-                jobFilePath, idGenerator, jobConfig, commonPluginJars, 
isStartWithSavePoint);
-    }
+    protected abstract MultipleTableJobConfigParser getJobConfigParser();
 
-    private LogicalDagGenerator getLogicalDagGenerator() {
+    protected LogicalDagGenerator getLogicalDagGenerator() {
         return new LogicalDagGenerator(actions, jobConfig, idGenerator);
     }
 
-    public ClientJobProxy execute() throws ExecutionException, 
InterruptedException {
-        JobImmutableInformation jobImmutableInformation =
-                new JobImmutableInformation(
-                        Long.parseLong(jobConfig.getJobContext().getJobId()),
-                        jobConfig.getName(),
-                        isStartWithSavePoint,
-                        
seaTunnelHazelcastClient.getSerializationService().toData(getLogicalDag()),
-                        jobConfig,
-                        new ArrayList<>(jarUrls));
-
-        return jobClient.createJobProxy(jobImmutableInformation);
-    }
-
-    private LogicalDag getLogicalDag() {
+    protected LogicalDag getLogicalDag() {
         ImmutablePair<List<Action>, Set<URL>> immutablePair = 
getJobConfigParser().parse();
         actions.addAll(immutablePair.getLeft());
         jarUrls.addAll(immutablePair.getRight());
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 86c0f3c94f..ee2505286f 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -130,6 +130,22 @@ public class MultipleTableJobConfigParser {
                 new JobConfigParser(idGenerator, commonPluginJars, 
isStartWithSavePoint);
     }
 
+    public MultipleTableJobConfigParser(
+            Config seaTunnelJobConfig,
+            IdGenerator idGenerator,
+            JobConfig jobConfig,
+            List<URL> commonPluginJars,
+            boolean isStartWithSavePoint) {
+        this.idGenerator = idGenerator;
+        this.jobConfig = jobConfig;
+        this.commonPluginJars = commonPluginJars;
+        this.isStartWithSavePoint = isStartWithSavePoint;
+        this.seaTunnelJobConfig = seaTunnelJobConfig;
+        this.envOptions = 
ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
+        this.fallbackParser =
+                new JobConfigParser(idGenerator, commonPluginJars, 
isStartWithSavePoint);
+    }
+
     public ImmutablePair<List<Action>, Set<URL>> parse() {
         List<? extends Config> sourceConfigs =
                 TypesafeConfigUtils.getConfigList(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
index d4137955c8..37e00cffab 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
@@ -21,6 +21,7 @@ import 
org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.server.log.Log4j2HttpGetCommandProcessor;
 import org.apache.seatunnel.engine.server.log.Log4j2HttpPostCommandProcessor;
 import org.apache.seatunnel.engine.server.rest.RestHttpGetCommandProcessor;
+import org.apache.seatunnel.engine.server.rest.RestHttpPostCommandProcessor;
 
 import com.hazelcast.cluster.ClusterState;
 import com.hazelcast.instance.impl.DefaultNodeExtension;
@@ -79,6 +80,7 @@ public class NodeExtension extends DefaultNodeExtension {
                 register(HTTP_GET, new Log4j2HttpGetCommandProcessor(this));
                 register(HTTP_POST, new Log4j2HttpPostCommandProcessor(this));
                 register(HTTP_GET, new RestHttpGetCommandProcessor(this));
+                register(HTTP_POST, new RestHttpPostCommandProcessor(this));
             }
         };
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/JobImmutableInformationEnv.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/JobImmutableInformationEnv.java
new file mode 100644
index 0000000000..4dd72e31cb
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/JobImmutableInformationEnv.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.job;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.core.job.AbstractJobEnvironment;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;
+
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+
+import java.util.ArrayList;
+
+public class JobImmutableInformationEnv extends AbstractJobEnvironment {
+    private final Config seaTunnelJobConfig;
+
+    private final NodeEngineImpl nodeEngine;
+
+    private final Long jobId;
+
+    public JobImmutableInformationEnv(
+            JobConfig jobConfig,
+            Config seaTunnelJobConfig,
+            Node node,
+            boolean isStartWithSavePoint,
+            Long jobId) {
+        super(jobConfig, isStartWithSavePoint);
+        this.seaTunnelJobConfig = seaTunnelJobConfig;
+        this.nodeEngine = node.getNodeEngine();
+        this.jobConfig.setJobContext(
+                new JobContext(
+                        isStartWithSavePoint
+                                ? jobId
+                                : nodeEngine
+                                        .getHazelcastInstance()
+                                        
.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME)
+                                        .newId()));
+        this.jobId = Long.valueOf(jobConfig.getJobContext().getJobId());
+    }
+
+    public Long getJobId() {
+        return jobId;
+    }
+
+    @Override
+    protected MultipleTableJobConfigParser getJobConfigParser() {
+        return new MultipleTableJobConfigParser(
+                seaTunnelJobConfig, idGenerator, jobConfig, commonPluginJars, 
isStartWithSavePoint);
+    }
+
+    public JobImmutableInformation build() {
+        return new JobImmutableInformation(
+                Long.parseLong(jobConfig.getJobContext().getJobId()),
+                jobConfig.getName(),
+                isStartWithSavePoint,
+                nodeEngine.getSerializationService().toData(getLogicalDag()),
+                jobConfig,
+                new ArrayList<>(jarUrls));
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
index 0a5d8437be..7776d592b8 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
@@ -21,6 +21,7 @@ public class RestConstant {
 
     public static final String RUNNING_JOBS_URL = 
"/hazelcast/rest/maps/running-jobs";
     public static final String RUNNING_JOB_URL = 
"/hazelcast/rest/maps/running-job";
+    public static final String SUBMIT_JOB_URL = 
"/hazelcast/rest/maps/submit-job";
 
     public static final String SYSTEM_MONITORING_INFORMATION =
             "/hazelcast/rest/maps/system-monitoring-information";
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
new file mode 100644
index 0000000000..e0edd93203
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.rest;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.server.CoordinatorService;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.job.JobImmutableInformationEnv;
+import org.apache.seatunnel.engine.server.log.Log4j2HttpPostCommandProcessor;
+import org.apache.seatunnel.engine.server.utils.RestUtil;
+
+import com.hazelcast.internal.ascii.TextCommandService;
+import com.hazelcast.internal.ascii.rest.HttpCommandProcessor;
+import com.hazelcast.internal.ascii.rest.HttpPostCommand;
+import com.hazelcast.internal.json.JsonObject;
+import com.hazelcast.internal.serialization.Data;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_400;
+import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500;
+import static 
org.apache.seatunnel.engine.server.rest.RestConstant.SUBMIT_JOB_URL;
+
+public class RestHttpPostCommandProcessor extends 
HttpCommandProcessor<HttpPostCommand> {
+    private final Log4j2HttpPostCommandProcessor original;
+
+    public RestHttpPostCommandProcessor(TextCommandService textCommandService) 
{
+        this(textCommandService, new 
Log4j2HttpPostCommandProcessor(textCommandService));
+    }
+
+    protected RestHttpPostCommandProcessor(
+            TextCommandService textCommandService,
+            Log4j2HttpPostCommandProcessor log4j2HttpPostCommandProcessor) {
+        super(
+                textCommandService,
+                
textCommandService.getNode().getLogger(Log4j2HttpPostCommandProcessor.class));
+        this.original = log4j2HttpPostCommandProcessor;
+    }
+
+    @Override
+    public void handle(HttpPostCommand httpPostCommand) {
+        String uri = httpPostCommand.getURI();
+        try {
+            if (uri.startsWith(SUBMIT_JOB_URL)) {
+                handleSubmitJob(httpPostCommand, uri);
+            } else {
+                original.handle(httpPostCommand);
+            }
+        } catch (IllegalArgumentException e) {
+            prepareResponse(SC_400, httpPostCommand, exceptionResponse(e));
+        } catch (Throwable e) {
+            logger.warning("An error occurred while handling request " + 
httpPostCommand, e);
+            prepareResponse(SC_500, httpPostCommand, exceptionResponse(e));
+        }
+
+        this.textCommandService.sendResponse(httpPostCommand);
+    }
+
+    private SeaTunnelServer getSeaTunnelServer() {
+        Map<String, Object> extensionServices =
+                
this.textCommandService.getNode().getNodeExtension().createExtensionServices();
+        return (SeaTunnelServer) 
extensionServices.get(Constant.SEATUNNEL_SERVICE_NAME);
+    }
+
+    private void handleSubmitJob(HttpPostCommand httpPostCommand, String uri)
+            throws IllegalArgumentException {
+        Map<String, String> requestParams = new HashMap<>();
+        RestUtil.buildRequestParams(requestParams, uri);
+        byte[] requestBody = httpPostCommand.getData();
+        if (requestBody.length == 0) {
+            throw new IllegalArgumentException("Request body is empty.");
+        }
+        JsonNode requestBodyJsonNode;
+        try {
+            requestBodyJsonNode = RestUtil.convertByteToJsonNode(requestBody);
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Invalid JSON format in request 
body.");
+        }
+        Config config = RestUtil.buildConfig(requestBodyJsonNode);
+        JobConfig jobConfig = new JobConfig();
+        jobConfig.setName(requestParams.get("jobName"));
+        JobImmutableInformationEnv jobImmutableInformationEnv =
+                new JobImmutableInformationEnv(
+                        jobConfig,
+                        config,
+                        textCommandService.getNode(),
+                        
Boolean.parseBoolean(requestParams.get("isStartWithSavePoint")),
+                        Long.parseLong(requestParams.get("jobId")));
+        JobImmutableInformation jobImmutableInformation = 
jobImmutableInformationEnv.build();
+        CoordinatorService coordinatorService = 
getSeaTunnelServer().getCoordinatorService();
+        Data data =
+                textCommandService
+                        .getNode()
+                        .nodeEngine
+                        .getSerializationService()
+                        .toData(jobImmutableInformation);
+        PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
+                coordinatorService.submitJob(
+                        Long.parseLong(jobConfig.getJobContext().getJobId()), 
data);
+        voidPassiveCompletableFuture.join();
+
+        Long jobId = jobImmutableInformationEnv.getJobId();
+        this.prepareResponse(
+                httpPostCommand,
+                new JsonObject().add("jobId", jobId).add("jobName", 
requestParams.get("jobName")));
+    }
+
+    @Override
+    public void handleRejection(HttpPostCommand httpPostCommand) {
+        handle(httpPostCommand);
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java
new file mode 100644
index 0000000000..d3761366d0
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.utils;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.common.Constants;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
+
+import com.hazelcast.internal.util.StringUtil;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class RestUtil {
+    private RestUtil() {}
+
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    public static JsonNode convertByteToJsonNode(byte[] byteData) throws 
IOException {
+        return objectMapper.readTree(byteData);
+    }
+
+    public static void buildRequestParams(Map<String, String> requestParams, 
String uri) {
+        requestParams.put("jobId", null);
+        requestParams.put("jobName", Constants.LOGO);
+        requestParams.put("isStartWithSavePoint", String.valueOf(false));
+        uri = StringUtil.stripTrailingSlash(uri);
+        if (!uri.contains("?")) {
+            return;
+        }
+        int indexEnd = uri.indexOf('?');
+        try {
+            for (String s : uri.substring(indexEnd + 1).split("&")) {
+                String[] param = s.split("=");
+                requestParams.put(param[0], param[1]);
+            }
+        } catch (IndexOutOfBoundsException e) {
+            throw new IllegalArgumentException("Invalid Params format in 
Params.");
+        }
+    }
+
+    public static Config buildConfig(JsonNode jsonNode) {
+        Map<String, Object> objectMap = JsonUtils.toMap(jsonNode);
+        return ConfigBuilder.of(objectMap);
+    }
+}


Reply via email to