This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new ce6dbb2ae [ST-Engine][seatunnel-seatunnel-starter] 
seatunnel-seatunnel-starter  (#2358)
ce6dbb2ae is described below

commit ce6dbb2ae250638e40c32bdf2137c972132ecbf6
Author: Eric <[email protected]>
AuthorDate: Fri Aug 5 14:43:33 2022 +0800

    [ST-Engine][seatunnel-seatunnel-starter] seatunnel-seatunnel-starter  
(#2358)
    
    * Add SeaTunnel Engine ConfigProvider and seatunnel-seatunnel-starter
    
    * add source file to licenserc.yaml
    
    Co-authored-by: Wenjun Ruan <[email protected]>
---
 .licenserc.yaml                                    |   6 ++
 LICENSE                                            |   6 +-
 seatunnel-core/pom.xml                             |   1 +
 .../seatunnel/core/starter/config/EngineType.java  |   1 +
 seatunnel-core/seatunnel-seatunnel-starter/pom.xml |  51 ++++++++++
 .../src/main/bin/start-seatunnel-engine-job.sh     |  64 ++++++++++++
 .../core/starter/seatunnel/CommandLineUtils.java}  |  24 +++--
 .../core/starter/seatunnel/SeaTunnelStarter.java   |  50 ++++++++++
 .../seatunnel/args/SeaTunnelCommandArgs.java       |  50 ++++++++++
 .../seatunnel/engine/client/JobConfigParser.java   |  13 ++-
 .../engine/client/JobExecutionEnvironment.java     |   2 +-
 .../seatunnel/engine/client/SeaTunnelClient.java   |   5 +-
 .../engine/client/SeaTunnelHazelcastClient.java    |   7 +-
 .../engine/client/JobConfigParserTest.java         |   5 +-
 .../engine/client/LogicalDagGeneratorTest.java     |   2 +-
 .../engine/client/SeaTunnelClientTest.java         |  24 ++---
 .../apache/seatunnel/engine/common/Constant.java   |  14 ++-
 .../engine/common/config/ConfigProvider.java       | 110 +++++++++++++++++++++
 .../engine/common/config/EngineConfig.java         |  21 ++--
 .../common/config}/SeaTunnelClientConfig.java      |   2 +-
 .../engine/common/config/SeaTunnelConfig.java      |  71 +++++++++++++
 .../common/config/SeaTunnelConfigSections.java     |  49 +++++++++
 .../engine/common/config/SeaTunnelProperties.java  |  21 ++--
 .../common/config/YamlSeaTunnelConfigBuilder.java  | 105 ++++++++++++++++++++
 .../common/config/YamlSeaTunnelConfigLocator.java  |  61 ++++++++++++
 .../config/YamlSeaTunnelDomConfigProcessor.java    |  79 +++++++++++++++
 .../engine/common/utils/ExceptionUtil.java         |  13 ++-
 .../src/main/resources/hazelcast-client.yaml       |  23 +++++
 .../src/main/resources/hazelcast.yaml              |  29 ++++++
 .../src/main/resources/seatunnel-default.yaml      |  20 ++++
 .../engine/server/operation/AsyncOperation.java    |  13 ++-
 .../task/AbstractSeaTunnelMessageTask.java         |  13 ++-
 32 files changed, 876 insertions(+), 79 deletions(-)

diff --git a/.licenserc.yaml b/.licenserc.yaml
index 0fd78f91c..a105be8bc 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -40,5 +40,11 @@ header:
     - '**/NOTICE'
     - '**/.gitkeep'
     - '**/com/typesafe/config/**'
+    - 
'seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/ConfigProvider.java'
+    - 
'seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfigSections.java'
+    - 
'seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigBuilder.java'
+    - 
'seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java'
+    - 
'seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java'
+    - 
'seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java'
 
   comment: on-failure
diff --git a/LICENSE b/LICENSE
index 7d4d93ac3..696821eed 100644
--- a/LICENSE
+++ b/LICENSE
@@ -219,4 +219,8 @@ 
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade
 generate_client_protocol.sh                                                    
                                                  from 
https://github.com/hazelcast/hazelcast
 
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
               from https://github.com/hazelcast/hazelcast
 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java
               from https://github.com/hazelcast/hazelcast
-seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
               from https://github.com/hazelcast/hazelcast
\ No newline at end of file
+seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
               from https://github.com/hazelcast/hazelcast
+seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/ConfigProvider.java
               from https://github.com/hazelcast/hazelcast
+seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfigSections.java
               from https://github.com/hazelcast/hazelcast
+seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigBuilder.java
               from https://github.com/hazelcast/hazelcast
+
diff --git a/seatunnel-core/pom.xml b/seatunnel-core/pom.xml
index 937fe767b..d5f96fcbd 100644
--- a/seatunnel-core/pom.xml
+++ b/seatunnel-core/pom.xml
@@ -38,5 +38,6 @@
         <module>seatunnel-core-starter</module>
         <module>seatunnel-flink-starter</module>
         <module>seatunnel-spark-starter</module>
+        <module>seatunnel-seatunnel-starter</module>
     </modules>
 </project>
diff --git 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
index 50d2e435e..b0c47d2ac 100644
--- 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
+++ 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.core.starter.config;
 public enum EngineType {
     SPARK("spark"),
     FLINK("flink"),
+    SEATUNNEL("seatunnel"),
     ;
 
     private final String engine;
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/pom.xml 
b/seatunnel-core/seatunnel-seatunnel-starter/pom.xml
new file mode 100644
index 000000000..d2dd1ddbe
--- /dev/null
+++ b/seatunnel-core/seatunnel-seatunnel-starter/pom.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>seatunnel-core</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-seatunnel-starter</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-core-starter</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-engine-client</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git 
a/seatunnel-core/seatunnel-seatunnel-starter/src/main/bin/start-seatunnel-engine-job.sh
 
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/bin/start-seatunnel-engine-job.sh
new file mode 100755
index 000000000..1cfc741d8
--- /dev/null
+++ 
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/bin/start-seatunnel-engine-job.sh
@@ -0,0 +1,64 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+set -eu
+# resolve links - $0 may be a softlink
+PRG="$0"
+
+while [ -h "$PRG" ] ; do
+  # shellcheck disable=SC2006
+  ls=`ls -ld "$PRG"`
+  # shellcheck disable=SC2006
+  link=`expr "$ls" : '.*-> \(.*\)$'`
+  if expr "$link" : '/.*' > /dev/null; then
+    PRG="$link"
+  else
+    # shellcheck disable=SC2006
+    PRG=`dirname "$PRG"`/"$link"
+  fi
+done
+
+PRG_DIR=`dirname "$PRG"`
+APP_DIR=`cd "$PRG_DIR/.." >/dev/null; pwd`
+CONF_DIR=${APP_DIR}/config
+APP_JAR=${APP_DIR}/lib/seatunnel-seatunnel-starter.jar
+APP_MAIN="org.apache.seatunnel.core.starter.seatunnel.SeaTunnelStarter"
+
+if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
+    . "${CONF_DIR}/seatunnel-env.sh"
+fi
+
+if [ $# == 0 ]
+then
+    args="-h"
+else
+    args=$@
+fi
+
+CMD=$(java -cp ${APP_JAR} ${APP_MAIN} ${args}) && EXIT_CODE=$? || EXIT_CODE=$?
+if [ ${EXIT_CODE} -eq 234 ]; then
+    # print usage
+    echo "${CMD}"
+    exit 0
+elif [ ${EXIT_CODE} -eq 0 ]; then
+    echo "Execute SeaTunnel Job: ${CMD}"
+    eval ${CMD}
+else
+    echo "${CMD}"
+    exit ${EXIT_CODE}
+fi
diff --git 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
 
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/CommandLineUtils.java
similarity index 55%
copy from 
seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
copy to 
seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/CommandLineUtils.java
index 50d2e435e..fd204e87a 100644
--- 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
+++ 
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/CommandLineUtils.java
@@ -15,20 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.starter.config;
+package org.apache.seatunnel.core.starter.seatunnel;
 
-public enum EngineType {
-    SPARK("spark"),
-    FLINK("flink"),
-    ;
+import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
 
-    private final String engine;
+import com.beust.jcommander.JCommander;
 
-    EngineType(String engine) {
-        this.engine = engine;
+public class CommandLineUtils {
+
+    private CommandLineUtils() {
+        throw new UnsupportedOperationException("CommandLineUtils is a utility 
class and cannot be instantiated");
     }
 
-    public String getEngine() {
-        return engine;
+    public static SeaTunnelCommandArgs parseSeaTunnelArgs(String[] args) {
+        SeaTunnelCommandArgs seatunnelCommandArgs = new SeaTunnelCommandArgs();
+        JCommander.newBuilder()
+            .addObject(seatunnelCommandArgs)
+            .build()
+            .parse(args);
+        return seatunnelCommandArgs;
     }
 }
diff --git 
a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
 
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
new file mode 100644
index 000000000..6c3372ad7
--- /dev/null
+++ 
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.seatunnel;
+
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
+import org.apache.seatunnel.core.starter.utils.FileUtils;
+import org.apache.seatunnel.engine.client.JobExecutionEnvironment;
+import org.apache.seatunnel.engine.client.JobProxy;
+import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+
+import com.hazelcast.client.config.ClientConfig;
+
+import java.nio.file.Path;
+
+public class SeaTunnelStarter {
+    public static void main(String[] args) {
+        SeaTunnelCommandArgs seaTunnelCommandArgs = 
CommandLineUtils.parseSeaTunnelArgs(args);
+        Path configFile = FileUtils.getConfigPath(seaTunnelCommandArgs);
+        Common.setDeployMode(DeployMode.CLIENT);
+        JobConfig jobConfig = new JobConfig();
+        jobConfig.setName("fake_to_file");
+
+        ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+        SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
+        JobExecutionEnvironment jobExecutionEnv = 
engineClient.createExecutionContext(configFile.toString(), jobConfig);
+
+        JobProxy jobProxy = jobExecutionEnv.execute();
+
+        // TODO wait for job complete and then exit
+    }
+}
diff --git 
a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/SeaTunnelCommandArgs.java
 
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/SeaTunnelCommandArgs.java
new file mode 100644
index 000000000..19c5d8587
--- /dev/null
+++ 
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/SeaTunnelCommandArgs.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.seatunnel.args;
+
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.core.starter.command.AbstractCommandArgs;
+import org.apache.seatunnel.core.starter.config.EngineType;
+
+import java.util.List;
+
+public class SeaTunnelCommandArgs extends AbstractCommandArgs {
+
+    /**
+     * Undefined parameters parsed will be stored here as seatunnel engint 
command parameters.
+     */
+    private List<String> seatunnelParams;
+
+    @Override
+    public EngineType getEngineType() {
+        return EngineType.SEATUNNEL;
+    }
+
+    @Override
+    public DeployMode getDeployMode() {
+        return DeployMode.CLIENT;
+    }
+
+    public List<String> getSeatunnelParams() {
+        return seatunnelParams;
+    }
+
+    public void setSeatunnelParams(List<String> seatunnelParams) {
+        this.seatunnelParams = seatunnelParams;
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
index 42d563225..cbd014db7 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.apis.base.plugin.Plugin;
 import org.apache.seatunnel.common.constants.CollectionConstants;
 import org.apache.seatunnel.core.base.config.ConfigBuilder;
+import org.apache.seatunnel.engine.common.config.JobConfig;
 import 
org.apache.seatunnel.engine.common.exception.JobDefineCheckExceptionSeaTunnel;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
@@ -70,13 +71,17 @@ public class JobConfigParser {
     private List<Action> actions = new ArrayList<>();
     private Set<URL> jarUrlsSet = new HashSet<>();
 
-    protected JobConfigParser(@NonNull String jobDefineFilePath, @NonNull 
IdGenerator idGenerator) {
+    private JobConfig jobConfig;
+
+    protected JobConfigParser(@NonNull String jobDefineFilePath, @NonNull 
IdGenerator idGenerator, @NonNull JobConfig jobConfig) {
         this.jobDefineFilePath = jobDefineFilePath;
         this.idGenerator = idGenerator;
+        this.jobConfig = jobConfig;
     }
 
     public ImmutablePair<List<Action>, Set<URL>> parse() {
         Config seaTunnelJobConfig = new 
ConfigBuilder(Paths.get(jobDefineFilePath)).getConfig();
+        Config envConfigs = seaTunnelJobConfig.getConfig("env");
         List<? extends Config> sinkConfigs = 
seaTunnelJobConfig.getConfigList("sink");
         List<? extends Config> transformConfigs = 
seaTunnelJobConfig.getConfigList("transform");
         List<? extends Config> sourceConfigs = 
seaTunnelJobConfig.getConfigList("source");
@@ -85,6 +90,8 @@ public class JobConfigParser {
             throw new JobDefineCheckExceptionSeaTunnel("Source And Sink can 
not be null");
         }
 
+        jobConfigAnalyze(envConfigs);
+
         if (sinkConfigs.size() == 1
             && sourceConfigs.size() == 1
             && (CollectionUtils.isEmpty(transformConfigs) || 
transformConfigs.size() == 1)) {
@@ -95,6 +102,10 @@ public class JobConfigParser {
         return new ImmutablePair<>(actions, jarUrlsSet);
     }
 
+    private void jobConfigAnalyze(Config envConfigs) {
+        // TODO Resolve env configuration and set jobConfig
+    }
+
     /**
      * If there are multiple sources or multiple transforms or multiple sink, 
We will rely on
      * source_table_name and result_table_name to build actions pipeline.
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
index a8257bebd..94b965460 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
@@ -59,7 +59,7 @@ public class JobExecutionEnvironment {
     }
 
     private JobConfigParser getJobConfigParser() {
-        return new JobConfigParser(jobFilePath, idGenerator);
+        return new JobConfigParser(jobFilePath, idGenerator, jobConfig);
     }
 
     public void addAction(List<Action> actions) {
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index 5aa194ce7..e182c527e 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -20,14 +20,15 @@ package org.apache.seatunnel.engine.client;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
 
+import com.hazelcast.client.config.ClientConfig;
 import com.hazelcast.logging.ILogger;
 import lombok.NonNull;
 
 public class SeaTunnelClient implements SeaTunnelClientInstance {
     private SeaTunnelHazelcastClient hazelcastClient;
 
-    public SeaTunnelClient(@NonNull SeaTunnelClientConfig 
seaTunnelClientConfig) {
-        this.hazelcastClient = new 
SeaTunnelHazelcastClient(seaTunnelClientConfig);
+    public SeaTunnelClient(@NonNull ClientConfig clientConfig) {
+        this.hazelcastClient = new SeaTunnelHazelcastClient(clientConfig);
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
index 6a696d628..369d290bb 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.client;
 
 import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
 
+import com.hazelcast.client.config.ClientConfig;
 import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
 import com.hazelcast.client.impl.clientside.HazelcastClientProxy;
 import com.hazelcast.client.impl.protocol.ClientMessage;
@@ -37,11 +38,11 @@ public class SeaTunnelHazelcastClient {
     private final HazelcastClientInstanceImpl hazelcastClient;
     private final SerializationService serializationService;
 
-    public SeaTunnelHazelcastClient(@NonNull SeaTunnelClientConfig 
seaTunnelClientConfig) {
-        Preconditions.checkNotNull(seaTunnelClientConfig, "config");
+    public SeaTunnelHazelcastClient(@NonNull ClientConfig clientConfig) {
+        Preconditions.checkNotNull(clientConfig, "config");
         this.hazelcastClient =
             ((HazelcastClientProxy) 
com.hazelcast.client.HazelcastClient.newHazelcastClient(
-                seaTunnelClientConfig)).client;
+                clientConfig)).client;
         this.serializationService = hazelcastClient.getSerializationService();
         
ExceptionUtil.registerSeaTunnelExceptions(hazelcastClient.getClientExceptionFactory());
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
index f9da29cf0..4534c0a7b 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.client;
 
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
 
@@ -40,7 +41,7 @@ public class JobConfigParserTest {
     public void testSimpleJobParse() {
         Common.setDeployMode(DeployMode.CLIENT);
         String filePath = TestUtils.getResource("/fakesource_to_file.conf");
-        JobConfigParser jobConfigParser = new JobConfigParser(filePath, new 
IdGenerator());
+        JobConfigParser jobConfigParser = new JobConfigParser(filePath, new 
IdGenerator(), new JobConfig());
         ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
         List<Action> actions = parse.getLeft();
         Assert.assertEquals(1, actions.size());
@@ -58,7 +59,7 @@ public class JobConfigParserTest {
     public void testComplexJobParse() {
         Common.setDeployMode(DeployMode.CLIENT);
         String filePath = 
TestUtils.getResource("/fakesource_to_file_complex.conf");
-        JobConfigParser jobConfigParser = new JobConfigParser(filePath, new 
IdGenerator());
+        JobConfigParser jobConfigParser = new JobConfigParser(filePath, new 
IdGenerator(), new JobConfig());
         ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
         List<Action> actions = parse.getLeft();
         Assert.assertEquals(1, actions.size());
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
index c5f282bda..fb1e8f68d 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
@@ -48,7 +48,7 @@ public class LogicalDagGeneratorTest {
         jobConfig.setName("fake_to_file");
 
         IdGenerator idGenerator = new IdGenerator();
-        ImmutablePair<List<Action>, Set<URL>> immutablePair = new 
JobConfigParser(filePath, idGenerator).parse();
+        ImmutablePair<List<Action>, Set<URL>> immutablePair = new 
JobConfigParser(filePath, idGenerator, new JobConfig()).parse();
 
         LogicalDagGenerator logicalDagGenerator =
             new LogicalDagGenerator(immutablePair.getLeft(), jobConfig, 
idGenerator);
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index 51fd02134..2c451034c 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -20,12 +20,14 @@ package org.apache.seatunnel.engine.client;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.SeaTunnelClientConfig;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
 
 import com.google.common.collect.Lists;
-import com.hazelcast.config.Config;
+import com.hazelcast.client.config.ClientConfig;
 import com.hazelcast.instance.impl.HazelcastInstanceFactory;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -39,19 +41,16 @@ public class SeaTunnelClientTest {
 
     @BeforeClass
     public static void beforeClass() throws Exception {
-        Config config = new Config();
-        config.getSecurityConfig().setEnabled(false);
-        config.getJetConfig().setEnabled(false);
-        config.setClusterName(Constant.DEFAULT_SEATUNNEL_CLUSTER_NAME);
-        config.getNetworkConfig().setPort(50001);
-        HazelcastInstanceFactory.newHazelcastInstance(config, 
Thread.currentThread().getName(),
+        SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
+        
HazelcastInstanceFactory.newHazelcastInstance(seaTunnelConfig.getHazelcastConfig(),
+            Thread.currentThread().getName(),
             new SeaTunnelNodeContext());
     }
 
     @Test
     public void testSayHello() {
         SeaTunnelClientConfig seaTunnelClientConfig = new 
SeaTunnelClientConfig();
-        
seaTunnelClientConfig.getNetworkConfig().setAddresses(Lists.newArrayList("localhost:50001"));
+        
seaTunnelClientConfig.getNetworkConfig().setAddresses(Lists.newArrayList("localhost:5801"));
         SeaTunnelClient engineClient = new 
SeaTunnelClient(seaTunnelClientConfig);
 
         String msg = "Hello world";
@@ -67,11 +66,12 @@ public class SeaTunnelClientTest {
         jobConfig.setBoundedness(Boundedness.BOUNDED);
         jobConfig.setName("fake_to_file");
 
-        SeaTunnelClientConfig seaTunnelClientConfig = new 
SeaTunnelClientConfig();
-        
seaTunnelClientConfig.getNetworkConfig().setAddresses(Lists.newArrayList("localhost:50001"));
-        SeaTunnelClient engineClient = new 
SeaTunnelClient(seaTunnelClientConfig);
+        ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+        SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
         JobExecutionEnvironment jobExecutionEnv = 
engineClient.createExecutionContext(filePath, jobConfig);
 
         JobProxy jobProxy = jobExecutionEnv.execute();
+
+        Assert.assertNotNull(jobProxy);
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
index 615b6ca48..9c016d91f 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
@@ -22,5 +22,17 @@ public class Constant {
 
     public static final String SEATUNNEL_ID_GENERATOR_NAME = 
"SeaTunnelIdGenerator";
 
-    public static final String DEFAULT_SEATUNNEL_CLUSTER_NAME = "SeaTunnel";
+    public static final String DEFAULT_SEATUNNEL_CLUSTER_NAME = "seatunnel";
+
+    /**
+     * The default port number for the cluster auto-discovery mechanism's
+     * multicast communication.
+     */
+    public static final int DEFAULT_SEATUNNEL_MULTICAST_PORT = 53326;
+
+    public static final String SYSPROP_SEATUNNEL_CONFIG = 
"hazelcast.seatunnel.config";
+
+    public static final String HAZELCAST_SEATUNNEL_CONF_FILE_PREFIX = 
"seatunnel";
+
+    public static final String HAZELCAST_SEATUNNEL_DEFAULT_YAML = 
"seatunnel-default.yaml";
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/ConfigProvider.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/ConfigProvider.java
new file mode 100644
index 000000000..591afc0ec
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/ConfigProvider.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.config;
+
+import static 
com.hazelcast.internal.config.DeclarativeConfigUtil.SYSPROP_CLIENT_CONFIG;
+import static 
com.hazelcast.internal.config.DeclarativeConfigUtil.SYSPROP_MEMBER_CONFIG;
+import static 
com.hazelcast.internal.config.DeclarativeConfigUtil.validateSuffixInSystemProperty;
+
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.client.config.YamlClientConfigBuilder;
+import com.hazelcast.client.config.impl.YamlClientConfigLocator;
+import com.hazelcast.config.Config;
+import com.hazelcast.config.YamlConfigBuilder;
+import com.hazelcast.internal.config.YamlConfigLocator;
+import lombok.NonNull;
+
+import java.util.Properties;
+
+/**
+ * Locates and loads SeaTunnel or SeaTunnel Client configurations from various 
locations.
+ *
+ * @see YamlSeaTunnelConfigLocator
+ */
+public final class ConfigProvider {
+
+    private ConfigProvider() {
+    }
+
+    public static SeaTunnelConfig locateAndGetSeaTunnelConfig() {
+        return locateAndGetSeaTunnelConfig(null);
+    }
+
+    @NonNull
+    public static SeaTunnelConfig locateAndGetSeaTunnelConfig(Properties 
properties) {
+
+        YamlSeaTunnelConfigLocator yamlConfigLocator = new 
YamlSeaTunnelConfigLocator();
+        SeaTunnelConfig config;
+
+        if (yamlConfigLocator.locateFromSystemProperty()) {
+            // 1. Try loading YAML config if provided in system property
+            config = new 
YamlSeaTunnelConfigBuilder(yamlConfigLocator).setProperties(properties).build();
+
+        } else if (yamlConfigLocator.locateInWorkDirOrOnClasspath()) {
+            // 2. Try loading YAML config from the working directory or from 
the classpath
+            config = new 
YamlSeaTunnelConfigBuilder(yamlConfigLocator).setProperties(properties).build();
+        } else {
+            // 3. Loading the default YAML configuration file
+            yamlConfigLocator.locateDefault();
+            config = new 
YamlSeaTunnelConfigBuilder(yamlConfigLocator).setProperties(properties).build();
+        }
+        return config;
+
+    }
+
+    @NonNull
+    public static ClientConfig locateAndGetClientConfig() {
+        validateSuffixInSystemProperty(SYSPROP_CLIENT_CONFIG);
+
+        ClientConfig config;
+        YamlClientConfigLocator yamlConfigLocator = new 
YamlClientConfigLocator();
+
+        if (yamlConfigLocator.locateFromSystemProperty()) {
+            // 1. Try loading config if provided in system property and it is 
an YAML file
+            config = new 
YamlClientConfigBuilder(yamlConfigLocator.getIn()).build();
+        } else if (yamlConfigLocator.locateInWorkDirOrOnClasspath()) {
+            // 2. Try loading YAML config from the working directory or from 
the classpath
+            config = new 
YamlClientConfigBuilder(yamlConfigLocator.getIn()).build();
+        } else {
+            // 3. Loading the default YAML configuration file
+            yamlConfigLocator.locateDefault();
+            config = new 
YamlClientConfigBuilder(yamlConfigLocator.getIn()).build();
+        }
+        return config;
+    }
+
+    @NonNull
+    public static Config locateAndGetMemberConfig(Properties properties) {
+        validateSuffixInSystemProperty(SYSPROP_MEMBER_CONFIG);
+
+        Config config;
+        YamlConfigLocator yamlConfigLocator = new YamlConfigLocator();
+
+        if (yamlConfigLocator.locateFromSystemProperty()) {
+            // 1. Try loading config if provided in system property and it is 
an YAML file
+            config = new 
YamlConfigBuilder(yamlConfigLocator.getIn()).setProperties(properties).build();
+        } else if (yamlConfigLocator.locateInWorkDirOrOnClasspath()) {
+            // 2. Try loading YAML config from the working directory or from 
the classpath
+            config = new 
YamlConfigBuilder(yamlConfigLocator.getIn()).setProperties(properties).build();
+        } else {
+            // 3. Loading the default YAML configuration file
+            yamlConfigLocator.locateDefault();
+            config = new 
YamlConfigBuilder(yamlConfigLocator.getIn()).setProperties(properties).build();
+        }
+        return config;
+    }
+}
diff --git 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
similarity index 68%
copy from 
seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
copy to 
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
index 50d2e435e..e62481cf8 100644
--- 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
@@ -15,20 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.starter.config;
+package org.apache.seatunnel.engine.common.config;
 
-public enum EngineType {
-    SPARK("spark"),
-    FLINK("flink"),
-    ;
+import static com.hazelcast.internal.util.Preconditions.checkBackupCount;
 
-    private final String engine;
+import lombok.Data;
 
-    EngineType(String engine) {
-        this.engine = engine;
-    }
+@Data
+public class EngineConfig {
+    private int backupCount;
 
-    public String getEngine() {
-        return engine;
+    public EngineConfig setBackupCount(int newBackupCount) {
+        checkBackupCount(newBackupCount, 0);
+        this.backupCount = newBackupCount;
+        return this;
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientConfig.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelClientConfig.java
similarity index 95%
rename from 
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientConfig.java
rename to 
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelClientConfig.java
index c6c6972a3..55daaef25 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientConfig.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelClientConfig.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.client;
+package org.apache.seatunnel.engine.common.config;
 
 import org.apache.seatunnel.engine.common.Constant;
 
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfig.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfig.java
new file mode 100644
index 000000000..98f6c4732
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfig.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.config;
+
+import org.apache.seatunnel.engine.common.Constant;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+
+import java.io.File;
+
+public class SeaTunnelConfig {
+
+    private static final ILogger LOGGER = 
Logger.getLogger(SeaTunnelConfig.class);
+
+    private final EngineConfig engineConfig = new EngineConfig();
+
+    static {
+        String value = seatunnelHome();
+        LOGGER.info("seatunnel.home is " + value);
+        System.setProperty(SeaTunnelProperties.SEATUNNEL_HOME.getName(), 
value);
+    }
+
+    private Config hazelcastConfig;
+
+    public SeaTunnelConfig() {
+        hazelcastConfig = new Config();
+        hazelcastConfig.getNetworkConfig().getJoin().getMulticastConfig()
+            .setMulticastPort(Constant.DEFAULT_SEATUNNEL_MULTICAST_PORT);
+        
hazelcastConfig.setClusterName(Constant.DEFAULT_SEATUNNEL_CLUSTER_NAME);
+        hazelcastConfig.getHotRestartPersistenceConfig()
+            .setBaseDir(new File(seatunnelHome(), 
"recovery").getAbsoluteFile());
+    }
+
+    /**
+     * Returns the absolute path for seatunnel.home based from the system 
property
+     * {@link SeaTunnelProperties#SEATUNNEL_HOME}
+     */
+    private static String seatunnelHome() {
+        return new 
File(System.getProperty(SeaTunnelProperties.SEATUNNEL_HOME.getName(),
+            
SeaTunnelProperties.SEATUNNEL_HOME.getDefaultValue())).getAbsolutePath();
+    }
+
+    public Config getHazelcastConfig() {
+        return hazelcastConfig;
+    }
+
+    public void setHazelcastConfig(Config hazelcastConfig) {
+        this.hazelcastConfig = hazelcastConfig;
+    }
+
+    public EngineConfig getEngineConfig() {
+        return engineConfig;
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfigSections.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfigSections.java
new file mode 100644
index 000000000..0fecdb0e6
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfigSections.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.config;
+
+/**
+ * Configuration sections for Hazelcast SeaTunnel shared by YAML based
+ * configurations
+ */
+enum SeaTunnelConfigSections {
+    SEATUNNEL("seatunnel", false),
+    ENGINE("engine", false);
+
+    final String name;
+    final boolean multipleOccurrence;
+
+    SeaTunnelConfigSections(String name, boolean multipleOccurrence) {
+        this.name = name;
+        this.multipleOccurrence = multipleOccurrence;
+
+    }
+
+    static boolean canOccurMultipleTimes(String name) {
+        for (SeaTunnelConfigSections element : values()) {
+            if (name.equals(element.name)) {
+                return element.multipleOccurrence;
+            }
+        }
+        return false;
+    }
+
+    boolean isEqual(String name) {
+        return this.name.equals(name);
+    }
+
+}
diff --git 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelProperties.java
similarity index 68%
copy from 
seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
copy to 
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelProperties.java
index 50d2e435e..05885dba8 100644
--- 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelProperties.java
@@ -15,20 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.starter.config;
+package org.apache.seatunnel.engine.common.config;
 
-public enum EngineType {
-    SPARK("spark"),
-    FLINK("flink"),
-    ;
+import com.hazelcast.spi.properties.HazelcastProperty;
 
-    private final String engine;
-
-    EngineType(String engine) {
-        this.engine = engine;
-    }
+/**
+ * Defines the names and default values for internal Hazelcast SeaTunnel 
properties.
+ */
+public final class SeaTunnelProperties {
+    public static final HazelcastProperty SEATUNNEL_HOME = new 
HazelcastProperty("seatunnel.home", "");
 
-    public String getEngine() {
-        return engine;
+    private SeaTunnelProperties() {
     }
 }
+
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigBuilder.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigBuilder.java
new file mode 100644
index 000000000..b4cea0d97
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigBuilder.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.config;
+
+import static com.hazelcast.internal.config.yaml.W3cDomUtil.asW3cNode;
+
+import com.hazelcast.config.AbstractYamlConfigBuilder;
+import com.hazelcast.config.InvalidConfigurationException;
+import com.hazelcast.internal.config.yaml.YamlDomChecker;
+import com.hazelcast.internal.nio.IOUtil;
+import com.hazelcast.internal.yaml.YamlLoader;
+import com.hazelcast.internal.yaml.YamlMapping;
+import com.hazelcast.internal.yaml.YamlNode;
+import com.hazelcast.jet.impl.util.ExceptionUtil;
+import lombok.NonNull;
+import org.w3c.dom.Node;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+public class YamlSeaTunnelConfigBuilder extends AbstractYamlConfigBuilder {
+
+    private final InputStream in;
+
+    public YamlSeaTunnelConfigBuilder() {
+        this((YamlSeaTunnelConfigLocator) null);
+    }
+
+    public YamlSeaTunnelConfigBuilder(YamlSeaTunnelConfigLocator locator) {
+        if (locator == null) {
+            locator = new YamlSeaTunnelConfigLocator();
+            locator.locateEverywhere();
+        }
+        this.in = locator.getIn();
+    }
+
+    public YamlSeaTunnelConfigBuilder(@NonNull InputStream inputStream) {
+        this.in = inputStream;
+    }
+
+    @Override
+    protected String getConfigRoot() {
+        return SeaTunnelConfigSections.SEATUNNEL.name;
+    }
+
+    public SeaTunnelConfig build() {
+        return build(new SeaTunnelConfig());
+    }
+
+    public SeaTunnelConfig build(SeaTunnelConfig config) {
+        try {
+            parseAndBuildConfig(config);
+        } catch (Exception e) {
+            throw ExceptionUtil.rethrow(e);
+        }
+        
config.setHazelcastConfig(ConfigProvider.locateAndGetMemberConfig(getProperties()));
+        return config;
+    }
+
+    private void parseAndBuildConfig(SeaTunnelConfig config) throws Exception {
+        YamlMapping yamlRootNode;
+        try {
+            yamlRootNode = (YamlMapping) YamlLoader.load(in);
+        } catch (Exception ex) {
+            throw new InvalidConfigurationException("Invalid YAML 
configuration", ex);
+        } finally {
+            IOUtil.closeResource(in);
+        }
+
+        YamlNode seatunnelRoot = 
yamlRootNode.childAsMapping(SeaTunnelConfigSections.SEATUNNEL.name);
+        if (seatunnelRoot == null) {
+            seatunnelRoot = yamlRootNode;
+        }
+
+        YamlDomChecker.check(seatunnelRoot);
+
+        Node w3cRootNode = asW3cNode(seatunnelRoot);
+        replaceVariables(w3cRootNode);
+        importDocuments(seatunnelRoot);
+
+        new YamlSeaTunnelDomConfigProcessor(true, 
config).buildConfig(w3cRootNode);
+    }
+
+    public YamlSeaTunnelConfigBuilder setProperties(Properties properties) {
+        if (properties == null) {
+            properties = System.getProperties();
+        }
+        setPropertiesInternal(properties);
+        return this;
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigLocator.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigLocator.java
new file mode 100644
index 000000000..7743195b3
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigLocator.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.config;
+
+import static 
com.hazelcast.internal.config.DeclarativeConfigUtil.YAML_ACCEPTED_SUFFIXES;
+
+import org.apache.seatunnel.engine.common.Constant;
+
+import com.hazelcast.internal.config.AbstractConfigLocator;
+
+/**
+ * A support class for the {@link YamlSeaTunnelConfigBuilder} to locate the
+ * yaml configuration.
+ */
+public final class YamlSeaTunnelConfigLocator extends AbstractConfigLocator {
+
+    public YamlSeaTunnelConfigLocator() {
+    }
+
+    @Override
+    public boolean locateFromSystemProperty() {
+        return loadFromSystemProperty(Constant.SYSPROP_SEATUNNEL_CONFIG, 
YAML_ACCEPTED_SUFFIXES);
+    }
+
+    @Override
+    protected boolean locateFromSystemPropertyOrFailOnUnacceptedSuffix() {
+        return 
loadFromSystemPropertyOrFailOnUnacceptedSuffix(Constant.SYSPROP_SEATUNNEL_CONFIG,
+            YAML_ACCEPTED_SUFFIXES);
+    }
+
+    @Override
+    protected boolean locateInWorkDir() {
+        return 
loadFromWorkingDirectory(Constant.HAZELCAST_SEATUNNEL_CONF_FILE_PREFIX, 
YAML_ACCEPTED_SUFFIXES);
+    }
+
+    @Override
+    protected boolean locateOnClasspath() {
+        return 
loadConfigurationFromClasspath(Constant.HAZELCAST_SEATUNNEL_CONF_FILE_PREFIX, 
YAML_ACCEPTED_SUFFIXES);
+    }
+
+    @Override
+    public boolean locateDefault() {
+        
loadDefaultConfigurationFromClasspath(Constant.HAZELCAST_SEATUNNEL_DEFAULT_YAML);
+        return true;
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
new file mode 100644
index 000000000..5531db478
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.config;
+
+import static com.hazelcast.internal.config.DomConfigHelper.childElements;
+import static com.hazelcast.internal.config.DomConfigHelper.cleanNodeName;
+import static com.hazelcast.internal.config.DomConfigHelper.getIntegerValue;
+
+import com.hazelcast.config.InvalidConfigurationException;
+import com.hazelcast.internal.config.AbstractDomConfigProcessor;
+import org.w3c.dom.Node;
+
+public class YamlSeaTunnelDomConfigProcessor extends 
AbstractDomConfigProcessor {
+
+    private final SeaTunnelConfig config;
+
+    YamlSeaTunnelDomConfigProcessor(boolean domLevel3, SeaTunnelConfig config) 
{
+        super(domLevel3);
+        this.config = config;
+    }
+
+    @Override
+    public void buildConfig(Node rootNode) {
+        for (Node node : childElements(rootNode)) {
+            String nodeName = cleanNodeName(node);
+            if (occurrenceSet.contains(nodeName)) {
+                throw new InvalidConfigurationException(
+                    "Duplicate '" + nodeName + "' definition found in the 
configuration.");
+            }
+            if (handleNode(node, nodeName)) {
+                continue;
+            }
+            if (!SeaTunnelConfigSections.canOccurMultipleTimes(nodeName)) {
+                occurrenceSet.add(nodeName);
+            }
+        }
+    }
+
+    private boolean handleNode(Node node, String name) {
+        if (SeaTunnelConfigSections.ENGINE.isEqual(name)) {
+            parseEngineConfig(node, config);
+        } else {
+            return true;
+        }
+        return false;
+    }
+
+    @SuppressWarnings("checkstyle:RegexpSinglelineJava")
+    private void parseEngineConfig(Node engineNode, SeaTunnelConfig config) {
+        final EngineConfig engineConfig = config.getEngineConfig();
+        for (Node node : childElements(engineNode)) {
+            String name = cleanNodeName(node);
+            switch (name) {
+                case "backup-count":
+                    engineConfig.setBackupCount(
+                        getIntegerValue("backup-count", getTextContent(node))
+                    );
+                    break;
+                default:
+                    throw new AssertionError("Unrecognized element: " + name);
+            }
+        }
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
index 7e1ed660e..a3c211af5 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
@@ -1,12 +1,11 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast-client.yaml
 
b/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast-client.yaml
new file mode 100644
index 000000000..a5fe3bc42
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast-client.yaml
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+hazelcast-client:
+  cluster-name: seatunnel
+
+  network:
+    cluster-members:
+      - localhost:5801
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml 
b/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
new file mode 100644
index 000000000..bde537011
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+hazelcast:
+  cluster-name: seatunnel
+  network:
+    join:
+      multicast:
+        enabled: true
+        multicast-group: 224.2.2.3
+        multicast-port: 53328
+    port:
+      auto-increment: true
+      port-count: 100
+      port: 5801
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel-default.yaml
 
b/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel-default.yaml
new file mode 100644
index 000000000..74f842755
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel-default.yaml
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+seatunnel:
+    engine:
+        backup-count: 1
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
index 21d6dc8b8..58227776e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
@@ -1,12 +1,11 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java
index 33c829f75..6617a9459 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java
@@ -1,12 +1,11 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

Reply via email to