This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new c7e92f019 [ST-Engine][Example] Add seatunnel example module (#2659)
c7e92f019 is described below

commit c7e92f01928e33a92dd9525200097579dd21db1a
Author: Hisoka <[email protected]>
AuthorDate: Thu Sep 8 21:51:01 2022 +0800

    [ST-Engine][Example] Add seatunnel example module (#2659)
    
    * [Engine][Example] Add seatunnel example module
    
    * [Engine] [Core] Add seatunnel engine example
    
    * [Engine] [Core] Add seatunnel engine example
    
    * [Engine] [Example] Add random cluster name when use local mode
    
    * [Engine] [Example] Fix review
---
 .../core/starter/config/ConfigChecker.java         |  4 +-
 .../flink/config/FlinkApiConfigChecker.java        |  2 +-
 seatunnel-core/seatunnel-seatunnel-starter/pom.xml |  6 ++
 .../core/starter/seatunnel/SeaTunnel.java}         | 22 ++---
 .../core/starter/seatunnel/SeaTunnelStarter.java   | 55 ------------
 .../seatunnel/args/ExecutionModeConverter.java}    | 19 +++--
 .../seatunnel/args/SeaTunnelCommandArgs.java       | 42 +++++++++-
 .../command/SeaTunnelApiConfValidateCommand.java   | 52 ++++++++++++
 .../command/SeaTunnelApiTaskExecuteCommand.java    | 98 ++++++++++++++++++++++
 .../command/SeaTunnelCommandBuilder.java}          | 23 ++---
 .../config/SeaTunnelApiConfigChecker.java}         |  6 +-
 .../starter/spark/command/SparkCommandBuilder.java | 13 +--
 .../spark/config/SparkApiConfigChecker.java        |  2 +-
 .../engine/client/job/ClientJobProxy.java          | 21 ++---
 .../seatunnel/engine/server/SeaTunnelServer.java   |  1 +
 .../seatunnel-engine-examples/pom.xml              | 18 ++++
 .../example/engine/SeaTunnelEngineExample.java     | 52 ++++++++++++
 .../main/resources/examples/fake_to_console.conf   | 51 +++++++++++
 18 files changed, 365 insertions(+), 122 deletions(-)

diff --git 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/ConfigChecker.java
 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/ConfigChecker.java
index 96925569c..3512eb38a 100644
--- 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/ConfigChecker.java
+++ 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/ConfigChecker.java
@@ -17,7 +17,6 @@
 
 package org.apache.seatunnel.core.starter.config;
 
-import org.apache.seatunnel.apis.base.env.RuntimeEnv;
 import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -25,9 +24,8 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 /**
  * Check the config is valid.
  *
- * @param <ENVIRONMENT> the environment type.
  */
-public interface ConfigChecker<ENVIRONMENT extends RuntimeEnv> {
+public interface ConfigChecker {
 
     /**
      * Check if the config is validated, if check fails, throw exception.
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkApiConfigChecker.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkApiConfigChecker.java
index 2ae68515d..4a41e8b11 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkApiConfigChecker.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkApiConfigChecker.java
@@ -22,7 +22,7 @@ import 
org.apache.seatunnel.core.starter.exception.ConfigCheckException;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-public class FlinkApiConfigChecker implements 
ConfigChecker<FlinkApiEnvironment> {
+public class FlinkApiConfigChecker implements ConfigChecker {
 
     @Override
     public void checkConfig(Config config) throws ConfigCheckException {
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/pom.xml 
b/seatunnel-core/seatunnel-seatunnel-starter/pom.xml
index d2dd1ddbe..b4d7c3bf6 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/pom.xml
+++ b/seatunnel-core/seatunnel-seatunnel-starter/pom.xml
@@ -46,6 +46,12 @@
             <artifactId>seatunnel-engine-client</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-engine-server</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkApiConfigChecker.java
 
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnel.java
similarity index 52%
copy from 
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkApiConfigChecker.java
copy to 
seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnel.java
index 2ae68515d..a3a98a7b1 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkApiConfigChecker.java
+++ 
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnel.java
@@ -15,17 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.starter.flink.config;
+package org.apache.seatunnel.core.starter.seatunnel;
 
-import org.apache.seatunnel.core.starter.config.ConfigChecker;
-import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
+import org.apache.seatunnel.core.starter.Seatunnel;
+import org.apache.seatunnel.core.starter.command.Command;
+import org.apache.seatunnel.core.starter.exception.CommandException;
+import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
+import 
org.apache.seatunnel.core.starter.seatunnel.command.SeaTunnelCommandBuilder;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-public class FlinkApiConfigChecker implements 
ConfigChecker<FlinkApiEnvironment> {
-
-    @Override
-    public void checkConfig(Config config) throws ConfigCheckException {
-        // todo: implement
+public class SeaTunnel {
+    public static void main(String[] args) throws CommandException {
+        SeaTunnelCommandArgs seaTunnelCommandArgs = 
CommandLineUtils.parseSeaTunnelArgs(args);
+        Command<SeaTunnelCommandArgs> command =
+            new SeaTunnelCommandBuilder().buildCommand(seaTunnelCommandArgs);
+        Seatunnel.run(command);
     }
 }
diff --git 
a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
 
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
deleted file mode 100644
index 94335cca9..000000000
--- 
a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.seatunnel;
-
-import org.apache.seatunnel.common.config.Common;
-import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
-import org.apache.seatunnel.core.starter.utils.FileUtils;
-import org.apache.seatunnel.engine.client.SeaTunnelClient;
-import org.apache.seatunnel.engine.client.job.ClientJobProxy;
-import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
-import org.apache.seatunnel.engine.common.config.ConfigProvider;
-import org.apache.seatunnel.engine.common.config.JobConfig;
-
-import com.hazelcast.client.config.ClientConfig;
-
-import java.nio.file.Path;
-import java.util.concurrent.ExecutionException;
-
-public class SeaTunnelStarter {
-    public static void main(String[] args) {
-        SeaTunnelCommandArgs seaTunnelCommandArgs = 
CommandLineUtils.parseSeaTunnelArgs(args);
-        Path configFile = FileUtils.getConfigPath(seaTunnelCommandArgs);
-        Common.setDeployMode(DeployMode.CLIENT);
-        JobConfig jobConfig = new JobConfig();
-        jobConfig.setName("fake_to_file");
-
-        ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
-        SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
-        JobExecutionEnvironment jobExecutionEnv = 
engineClient.createExecutionContext(configFile.toString(), jobConfig);
-
-        ClientJobProxy clientJobProxy;
-        try {
-            clientJobProxy = jobExecutionEnv.execute();
-            clientJobProxy.waitForJobComplete();
-        } catch (ExecutionException | InterruptedException e) {
-            throw new RuntimeException(e);
-        }
-    }
-}
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkApiConfigChecker.java
 
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ExecutionModeConverter.java
similarity index 59%
copy from 
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkApiConfigChecker.java
copy to 
seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ExecutionModeConverter.java
index 2ae68515d..ee7da538b 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkApiConfigChecker.java
+++ 
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ExecutionModeConverter.java
@@ -15,17 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.starter.flink.config;
+package org.apache.seatunnel.core.starter.seatunnel.args;
 
-import org.apache.seatunnel.core.starter.config.ConfigChecker;
-import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
+import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-public class FlinkApiConfigChecker implements 
ConfigChecker<FlinkApiEnvironment> {
+import com.beust.jcommander.IStringConverter;
+import com.beust.jcommander.ParameterException;
 
+public class ExecutionModeConverter implements IStringConverter<ExecutionMode> 
{
     @Override
-    public void checkConfig(Config config) throws ConfigCheckException {
-        // todo: implement
+    public ExecutionMode convert(String value) {
+        try {
+            return ExecutionMode.valueOf(value.toUpperCase());
+        } catch (IllegalArgumentException e) {
+            throw new ParameterException("execution-mode: " + value + " is not 
allowed.");
+        }
     }
 }
diff --git 
a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/SeaTunnelCommandArgs.java
 
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/SeaTunnelCommandArgs.java
index 19c5d8587..9e86b2800 100644
--- 
a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/SeaTunnelCommandArgs.java
+++ 
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/SeaTunnelCommandArgs.java
@@ -20,16 +20,56 @@ package org.apache.seatunnel.core.starter.seatunnel.args;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.core.starter.command.AbstractCommandArgs;
 import org.apache.seatunnel.core.starter.config.EngineType;
+import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
+
+import com.beust.jcommander.Parameter;
 
 import java.util.List;
 
 public class SeaTunnelCommandArgs extends AbstractCommandArgs {
 
     /**
-     * Undefined parameters parsed will be stored here as seatunnel engint 
command parameters.
+     * Undefined parameters parsed will be stored here as seatunnel engine 
command parameters.
      */
     private List<String> seatunnelParams;
 
+    @Parameter(names = {"-n", "--name"},
+        description = "The name of job")
+    private String name = "seatunnel_job";
+
+    @Parameter(names = {"-cn", "--cluster"},
+        description = "The name of cluster")
+    private String clusterName = "seatunnel_default_cluster";
+
+    @Parameter(names = {"-e", "--deploy-mode"},
+        description = "SeaTunnel deploy mode",
+        converter = ExecutionModeConverter.class)
+    private ExecutionMode executionMode = ExecutionMode.LOCAL;
+
+    public ExecutionMode getExecutionMode() {
+        return executionMode;
+    }
+
+    public void setExecutionMode(ExecutionMode executionMode) {
+        this.executionMode = executionMode;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
     @Override
     public EngineType getEngineType() {
         return EngineType.SEATUNNEL;
diff --git 
a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelApiConfValidateCommand.java
 
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelApiConfValidateCommand.java
new file mode 100644
index 000000000..8036120ae
--- /dev/null
+++ 
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelApiConfValidateCommand.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.seatunnel.command;
+
+import org.apache.seatunnel.core.starter.command.Command;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
+import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
+import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
+import 
org.apache.seatunnel.core.starter.seatunnel.config.SeaTunnelApiConfigChecker;
+import org.apache.seatunnel.core.starter.utils.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Path;
+
+/**
+ * Use to validate the configuration of the SeaTunnel API.
+ */
+public class SeaTunnelApiConfValidateCommand implements 
Command<SeaTunnelCommandArgs> {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SeaTunnelApiConfValidateCommand.class);
+
+    private final SeaTunnelCommandArgs seaTunnelCommandArgs;
+
+    public SeaTunnelApiConfValidateCommand(SeaTunnelCommandArgs 
seaTunnelCommandArgs) {
+        this.seaTunnelCommandArgs = seaTunnelCommandArgs;
+    }
+
+    @Override
+    public void execute() throws ConfigCheckException {
+        Path configPath = FileUtils.getConfigPath(seaTunnelCommandArgs);
+        ConfigBuilder configBuilder = new ConfigBuilder(configPath);
+        new SeaTunnelApiConfigChecker().checkConfig(configBuilder.getConfig());
+        LOGGER.info("config OK !");
+    }
+}
diff --git 
a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelApiTaskExecuteCommand.java
 
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelApiTaskExecuteCommand.java
new file mode 100644
index 000000000..d9d8638d9
--- /dev/null
+++ 
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelApiTaskExecuteCommand.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.seatunnel.command;
+
+import org.apache.seatunnel.core.starter.command.Command;
+import org.apache.seatunnel.core.starter.exception.CommandExecuteException;
+import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
+import org.apache.seatunnel.core.starter.utils.FileUtils;
+import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.client.job.ClientJobProxy;
+import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
+import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
+
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.instance.impl.HazelcastInstanceFactory;
+
+import java.nio.file.Path;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * This command is used to execute the SeaTunnel engine job by SeaTunnel API.
+ */
+public class SeaTunnelApiTaskExecuteCommand implements 
Command<SeaTunnelCommandArgs> {
+
+    private final SeaTunnelCommandArgs seaTunnelCommandArgs;
+
+    // TODO custom cluster name on cluster execution mode
+
+    public SeaTunnelApiTaskExecuteCommand(SeaTunnelCommandArgs 
seaTunnelCommandArgs) {
+        this.seaTunnelCommandArgs = seaTunnelCommandArgs;
+    }
+
+    @Override
+    public void execute() throws CommandExecuteException {
+        Path configFile = FileUtils.getConfigPath(seaTunnelCommandArgs);
+
+        JobConfig jobConfig = new JobConfig();
+        jobConfig.setName(seaTunnelCommandArgs.getName());
+        HazelcastInstance instance = null;
+        try {
+            String clusterName = seaTunnelCommandArgs.getClusterName();
+            if 
(seaTunnelCommandArgs.getExecutionMode().equals(ExecutionMode.LOCAL)) {
+                clusterName = creatRandomClusterName(clusterName);
+                instance = createServerInLocal(clusterName);
+            }
+            ClientConfig clientConfig = 
ConfigProvider.locateAndGetClientConfig();
+            clientConfig.setClusterName(clusterName);
+            SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
+            JobExecutionEnvironment jobExecutionEnv = 
engineClient.createExecutionContext(configFile.toString(), jobConfig);
+
+            ClientJobProxy clientJobProxy;
+            clientJobProxy = jobExecutionEnv.execute();
+            clientJobProxy.waitForJobComplete();
+        } catch (ExecutionException | InterruptedException e) {
+            throw new CommandExecuteException("SeaTunnel job executed failed", 
e);
+        } finally {
+            if (instance != null) {
+                instance.shutdown();
+            }
+        }
+    }
+
+    private HazelcastInstance createServerInLocal(String clusterName) {
+        SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
+        seaTunnelConfig.getHazelcastConfig().setClusterName(clusterName);
+        return 
HazelcastInstanceFactory.newHazelcastInstance(seaTunnelConfig.getHazelcastConfig(),
+            Thread.currentThread().getName(),
+            new SeaTunnelNodeContext(seaTunnelConfig));
+    }
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    private String creatRandomClusterName(String namePrefix) {
+        Random random = new Random();
+        return namePrefix + "-" + random.nextInt(1000000);
+    }
+
+}
diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkCommandBuilder.java
 
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelCommandBuilder.java
similarity index 58%
copy from 
seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkCommandBuilder.java
copy to 
seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelCommandBuilder.java
index 9f6a9ec5b..722c84e7b 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkCommandBuilder.java
+++ 
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelCommandBuilder.java
@@ -15,30 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.starter.spark.command;
+package org.apache.seatunnel.core.starter.seatunnel.command;
 
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.core.starter.command.Command;
 import org.apache.seatunnel.core.starter.command.CommandBuilder;
-import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
 
-public class SparkCommandBuilder implements CommandBuilder<SparkCommandArgs> {
+public class SeaTunnelCommandBuilder implements 
CommandBuilder<SeaTunnelCommandArgs> {
 
     @Override
-    public Command<SparkCommandArgs> buildCommand(SparkCommandArgs 
commandArgs) {
+    public Command<SeaTunnelCommandArgs> buildCommand(SeaTunnelCommandArgs 
commandArgs) {
         Common.setDeployMode(commandArgs.getDeployMode());
-        return new SeaTunnelApiCommandBuilder().buildCommand(commandArgs);
-    }
-
-    /**
-     * Used to generate command for seaTunnel API.
-     */
-    private static class SeaTunnelApiCommandBuilder extends 
SparkCommandBuilder {
-        @Override
-        public Command<SparkCommandArgs> buildCommand(SparkCommandArgs 
commandArgs) {
-            return commandArgs.isCheckConfig() ? new 
SparkApiConfValidateCommand(commandArgs)
-                    : new SparkApiTaskExecuteCommand(commandArgs);
-        }
+        return commandArgs.isCheckConfig() ? new 
SeaTunnelApiConfValidateCommand(commandArgs)
+            : new SeaTunnelApiTaskExecuteCommand(commandArgs);
     }
 }
-
diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkApiConfigChecker.java
 
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java
similarity index 87%
copy from 
seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkApiConfigChecker.java
copy to 
seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java
index b08a4a752..bfaff2a9c 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkApiConfigChecker.java
+++ 
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java
@@ -15,17 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.starter.spark.config;
+package org.apache.seatunnel.core.starter.seatunnel.config;
 
 import org.apache.seatunnel.core.starter.config.ConfigChecker;
 import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-public class SparkApiConfigChecker implements ConfigChecker<SparkEnvironment> {
+public class SeaTunnelApiConfigChecker implements ConfigChecker {
 
     @Override
     public void checkConfig(Config config) throws ConfigCheckException {
-        // todo: implement
+        // TODO implement
     }
 }
diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkCommandBuilder.java
 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkCommandBuilder.java
index 9f6a9ec5b..9bf8e376e 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkCommandBuilder.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkCommandBuilder.java
@@ -27,18 +27,9 @@ public class SparkCommandBuilder implements 
CommandBuilder<SparkCommandArgs> {
     @Override
     public Command<SparkCommandArgs> buildCommand(SparkCommandArgs 
commandArgs) {
         Common.setDeployMode(commandArgs.getDeployMode());
-        return new SeaTunnelApiCommandBuilder().buildCommand(commandArgs);
+        return commandArgs.isCheckConfig() ? new 
SparkApiConfValidateCommand(commandArgs)
+            : new SparkApiTaskExecuteCommand(commandArgs);
     }
 
-    /**
-     * Used to generate command for seaTunnel API.
-     */
-    private static class SeaTunnelApiCommandBuilder extends 
SparkCommandBuilder {
-        @Override
-        public Command<SparkCommandArgs> buildCommand(SparkCommandArgs 
commandArgs) {
-            return commandArgs.isCheckConfig() ? new 
SparkApiConfValidateCommand(commandArgs)
-                    : new SparkApiTaskExecuteCommand(commandArgs);
-        }
-    }
 }
 
diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkApiConfigChecker.java
 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkApiConfigChecker.java
index b08a4a752..afdf77579 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkApiConfigChecker.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkApiConfigChecker.java
@@ -22,7 +22,7 @@ import 
org.apache.seatunnel.core.starter.exception.ConfigCheckException;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-public class SparkApiConfigChecker implements ConfigChecker<SparkEnvironment> {
+public class SparkApiConfigChecker implements ConfigChecker {
 
     @Override
     public void checkConfig(Config config) throws ConfigCheckException {
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
index f25ee6998..8c372f2d5 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
@@ -37,8 +37,8 @@ import lombok.NonNull;
 
 public class ClientJobProxy implements Job {
     private static final ILogger LOGGER = 
Logger.getLogger(ClientJobProxy.class);
-    private SeaTunnelHazelcastClient seaTunnelHazelcastClient;
-    private JobImmutableInformation jobImmutableInformation;
+    private final SeaTunnelHazelcastClient seaTunnelHazelcastClient;
+    private final JobImmutableInformation jobImmutableInformation;
 
     public ClientJobProxy(@NonNull SeaTunnelHazelcastClient 
seaTunnelHazelcastClient,
                           @NonNull JobImmutableInformation 
jobImmutableInformation) {
@@ -63,10 +63,10 @@ public class ClientJobProxy implements Job {
     /**
      * This method will block even the Job turn to a EndState
      *
-     * @return
+     * @return The job final status
      */
     public JobStatus waitForJobComplete() {
-        JobStatus jobStatus = null;
+        JobStatus jobStatus;
         try {
             jobStatus = RetryUtils.retryWithException(() -> {
                 PassiveCompletableFuture<JobStatus> jobFuture = 
doWaitForJobComplete();
@@ -84,18 +84,15 @@ public class ClientJobProxy implements Job {
             jobImmutableInformation.getJobConfig().getName(),
             jobImmutableInformation.getJobId(),
             jobStatus));
+        this.seaTunnelHazelcastClient.getHazelcastInstance().shutdown();
         return jobStatus;
     }
 
     @Override
     public PassiveCompletableFuture<JobStatus> doWaitForJobComplete() {
-        PassiveCompletableFuture<JobStatus> jobFuture =
-            seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(
-                
SeaTunnelWaitForJobCompleteCodec.encodeRequest(jobImmutableInformation.getJobId()),
-                response -> {
-                    return 
JobStatus.values()[SeaTunnelWaitForJobCompleteCodec.decodeResponse(response)];
-                });
-        return jobFuture;
+        return seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(
+            
SeaTunnelWaitForJobCompleteCodec.encodeRequest(jobImmutableInformation.getJobId()),
+            response -> 
JobStatus.values()[SeaTunnelWaitForJobCompleteCodec.decodeResponse(response)]);
     }
 
     @Override
@@ -110,7 +107,7 @@ public class ClientJobProxy implements Job {
     public JobStatus getJobStatus() {
         int jobStatusOrdinal = 
seaTunnelHazelcastClient.requestOnMasterAndDecodeResponse(
             
SeaTunnelGetJobStatusCodec.encodeRequest(jobImmutableInformation.getJobId()),
-            response -> SeaTunnelGetJobStatusCodec.decodeResponse(response));
+            SeaTunnelGetJobStatusCodec::decodeResponse);
         return JobStatus.values()[jobStatusOrdinal];
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 56c006206..eecd2f2f1 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -128,6 +128,7 @@ public class SeaTunnelServer implements ManagedService, 
MembershipAwareService,
         if (resourceManager != null) {
             resourceManager.close();
         }
+        executorService.shutdown();
         taskExecutionService.shutdown();
     }
 
diff --git a/seatunnel-examples/seatunnel-engine-examples/pom.xml 
b/seatunnel-examples/seatunnel-engine-examples/pom.xml
index 9c3a5ca5d..b75a68b85 100644
--- a/seatunnel-examples/seatunnel-engine-examples/pom.xml
+++ b/seatunnel-examples/seatunnel-engine-examples/pom.xml
@@ -27,4 +27,22 @@
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>seatunnel-engine-examples</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-seatunnel-starter</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-fake</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-console</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
 </project>
\ No newline at end of file
diff --git 
a/seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineExample.java
 
b/seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineExample.java
new file mode 100644
index 000000000..a50f31271
--- /dev/null
+++ 
b/seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineExample.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.example.engine;
+
+import org.apache.seatunnel.core.starter.Seatunnel;
+import org.apache.seatunnel.core.starter.command.Command;
+import org.apache.seatunnel.core.starter.exception.CommandException;
+import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
+import 
org.apache.seatunnel.core.starter.seatunnel.command.SeaTunnelCommandBuilder;
+
+import java.io.FileNotFoundException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+
+public class SeaTunnelEngineExample {
+
+    public static void main(String[] args) throws FileNotFoundException, 
URISyntaxException, CommandException {
+        String configFile = 
getTestConfigFile("/examples/fake_to_console.conf");
+        SeaTunnelCommandArgs seaTunnelCommandArgs = new SeaTunnelCommandArgs();
+        seaTunnelCommandArgs.setConfigFile(configFile);
+        seaTunnelCommandArgs.setCheckConfig(false);
+        seaTunnelCommandArgs.setName("fake_to_console");
+        Command<SeaTunnelCommandArgs> command =
+            new SeaTunnelCommandBuilder().buildCommand(seaTunnelCommandArgs);
+        Seatunnel.run(command);
+    }
+
+    public static String getTestConfigFile(String configFile) throws 
FileNotFoundException, URISyntaxException {
+        URL resource = SeaTunnelEngineExample.class.getResource(configFile);
+        if (resource == null) {
+            throw new FileNotFoundException("Can't find config file: " + 
configFile);
+        }
+        return Paths.get(resource.toURI()).toString();
+    }
+
+}
diff --git 
a/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf
 
b/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf
new file mode 100644
index 000000000..404c9ec69
--- /dev/null
+++ 
b/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  execution.parallelism = 3
+  job.mode = "BATCH"
+  execution.checkpoint.interval = 5000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    result_table_name = "fake"
+    field_name = "name,age",
+    parallelism = 2
+  }
+
+  FakeSource {
+    result_table_name = "fake"
+    field_name = "name,age",
+    parallelism = 3
+  }
+}
+
+transform {
+}
+
+sink {
+  console {
+    source_table_name="fake"
+  }
+}
\ No newline at end of file

Reply via email to