This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
     new 3ef1758d8 Upgrade seatunnel to 2.3.1 (#4673)
3ef1758d8 is described below

commit 3ef1758d875f162e2edc7117365e0735bd2d287f
Author: xiutao <[email protected]>
AuthorDate: Tue Jun 20 17:25:43 2023 +0800

    Upgrade seatunnel to 2.3.1 (#4673)
---
 .github/workflows/publish-docker.yaml              |   1 +
 .../manager/label/entity/engine/RunType.scala      |   3 +-
 .../release-docs/licenses/LICENSE-hazelcast.txt    | 202 ++++++++++++++++++
 linkis-engineconn-plugins/seatunnel/pom.xml        |  21 +-
 ...inkClient.java => LinkSeatunnelZetaClient.java} |  19 +-
 ...ient.java => LinkisSeatunnelFlinkV2Client.java} |  17 +-
 ...ient.java => LinkisSeatunnelSparkV2Client.java} |  14 +-
 .../errorcode/SeatunnelErrorCodeSummary.java       |   2 +-
 .../org/apache/seatunnel/common/config/Common.java | 130 +++++++----
 .../seatunnel/core/base/config/ConfigBuilder.java  |  26 +--
 .../seatunnel/core/base/config/PluginFactory.java  | 237 ---------------------
 .../apache/seatunnel/core/flink/FlinkStarter.java  |  74 -------
 .../seatunnel/core/flink/FlinkV2Starter.java       |  99 +++++++++
 .../{SparkStarter.java => SparkV2Starter.java}     | 138 ++++++++----
 .../apache/seatunnel/core/sql/FlinkSqlStarter.java |  69 ------
 .../apache/seatunnel/core/zeta/ZetaStarter.java    |  90 ++++++++
 .../config/SeatunnelFlinkEnvConfiguration.scala    |   9 +-
 ...n.scala => SeatunnelZetaEnvConfiguration.scala} |  26 +--
 .../executor/SeatunnelFlinkOnceCodeExecutor.scala  |  21 +-
 .../executor/SeatunnelSparkOnceCodeExecutor.scala  |   4 +-
 ...r.scala => SeatunnelZetaOnceCodeExecutor.scala} |  57 ++---
 .../factory/SeatunnelEngineConnFactory.scala       |  11 +-
 ...ry.scala => SeatunnelZetaExecutorFactory.scala} |   9 +-
 .../seatunnel/util/SeatunnelUtils.scala            |  15 +-
 .../linkis-metadata-query/service/jdbc/pom.xml     |   2 +-
 tool/dependencies/known-dependencies.txt           |  21 +-
 26 files changed, 734 insertions(+), 583 deletions(-)

diff --git a/.github/workflows/publish-docker.yaml 
b/.github/workflows/publish-docker.yaml
index fdd998b1e..f9e2a5f7b 100644
--- a/.github/workflows/publish-docker.yaml
+++ b/.github/workflows/publish-docker.yaml
@@ -20,6 +20,7 @@ on:
   push:
     branches:
       - dev-1.4.0
+
 env:
   MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false 
-Dmaven.wagon.http.retryHandler.class=standard 
-Dmaven.wagon.http.retryHandler.count=3 
-Dmaven.wagon.httpconnectionManager.ttlSeconds=120
 
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
index 0dee150d2..a24445269 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
+++ 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
@@ -43,7 +43,8 @@ object RunType extends Enumeration {
 
   val TRINO_SQL = Value("tsql")
 
-  val SEATUNNEL_FLINK_SQL = Value("sfsql")
+
+  val SEATUNNEL_ZETA = Value("szeta")
   val SEATUNNEL_FLINK = Value("sflink")
   val SEATUNNEL_SPARK = Value("sspark")
 
diff --git a/linkis-dist/release-docs/licenses/LICENSE-hazelcast.txt 
b/linkis-dist/release-docs/licenses/LICENSE-hazelcast.txt
new file mode 100644
index 000000000..d64569567
--- /dev/null
+++ b/linkis-dist/release-docs/licenses/LICENSE-hazelcast.txt
@@ -0,0 +1,202 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
diff --git a/linkis-engineconn-plugins/seatunnel/pom.xml 
b/linkis-engineconn-plugins/seatunnel/pom.xml
index ed843f9f2..a04c71f29 100644
--- a/linkis-engineconn-plugins/seatunnel/pom.xml
+++ b/linkis-engineconn-plugins/seatunnel/pom.xml
@@ -26,6 +26,10 @@
 
   <artifactId>linkis-engineplugin-seatunnel</artifactId>
 
+  <properties>
+    <seatunnel.version>2.3.1</seatunnel.version>
+  </properties>
+
   <dependencies>
     <dependency>
       <groupId>org.apache.linkis</groupId>
@@ -79,7 +83,17 @@
 
     <dependency>
       <groupId>org.apache.seatunnel</groupId>
-      <artifactId>seatunnel-core-spark</artifactId>
+      <artifactId>seatunnel-engine-core</artifactId>
+      <version>${seatunnel.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.hazelcast</groupId>
+      <artifactId>hazelcast</artifactId>
+      <version>5.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.seatunnel</groupId>
+      <artifactId>seatunnel-starter</artifactId>
       <version>${seatunnel.version}</version>
       <exclusions>
         <exclusion>
@@ -90,7 +104,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.seatunnel</groupId>
-      <artifactId>seatunnel-core-flink-sql</artifactId>
+      <artifactId>seatunnel-spark-2-starter</artifactId>
       <version>${seatunnel.version}</version>
       <exclusions>
         <exclusion>
@@ -101,7 +115,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.seatunnel</groupId>
-      <artifactId>seatunnel-core-flink</artifactId>
+      <artifactId>seatunnel-flink-13-starter</artifactId>
       <version>${seatunnel.version}</version>
       <exclusions>
         <exclusion>
@@ -110,7 +124,6 @@
         </exclusion>
       </exclusions>
     </dependency>
-
   </dependencies>
 
   <build>
diff --git 
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelFlinkClient.java
 
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkSeatunnelZetaClient.java
similarity index 77%
rename from 
linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelFlinkClient.java
rename to 
linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkSeatunnelZetaClient.java
index 54c2ec27f..be9fdb048 100644
--- 
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelFlinkClient.java
+++ 
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkSeatunnelZetaClient.java
@@ -19,7 +19,7 @@ package org.apache.linkis.engineconnplugin.seatunnel.client;
 
 import org.apache.linkis.engineconn.computation.executor.utlis.JarLoader;
 
-import org.apache.seatunnel.core.flink.SeatunnelFlink;
+import org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
@@ -29,8 +29,8 @@ import java.lang.reflect.Method;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class LinkisSeatunnelFlinkClient {
-  private static Logger logger = 
LoggerFactory.getLogger(LinkisSeatunnelFlinkClient.class);
+public class LinkSeatunnelZetaClient {
+  private static Logger logger = 
LoggerFactory.getLogger(LinkSeatunnelZetaClient.class);
   private static Class<?> seatunnelEngineClass;
   private static JarLoader jarLoader;
 
@@ -39,16 +39,23 @@ public class LinkisSeatunnelFlinkClient {
       jarLoader =
           new JarLoader(
               new String[] {
-                LinkisSeatunnelFlinkClient.class
+                LinkSeatunnelZetaClient.class
                     .getProtectionDomain()
                     .getCodeSource()
                     .getLocation()
                     .getPath()
               });
       jarLoader.loadClass("org.apache.seatunnel.common.config.Common", false);
-      seatunnelEngineClass = 
jarLoader.loadClass("org.apache.seatunnel.core.flink.FlinkStarter");
+      
jarLoader.loadClass("org.apache.seatunnel.core.base.config.ConfigBuilder", 
false);
+      //      
jarLoader.loadClass("org.apache.seatunnel.core.base.config.PluginFactory", 
false);
+      seatunnelEngineClass = 
jarLoader.loadClass("org.apache.seatunnel.core.zeta.ZetaStarter");
       jarLoader.addJarURL(
-          
SeatunnelFlink.class.getProtectionDomain().getCodeSource().getLocation().getPath());
+          SeaTunnelClient.class
+              .getProtectionDomain()
+              .getCodeSource()
+              .getLocation()
+              .toURI()
+              .getPath());
       Thread.currentThread().setContextClassLoader(jarLoader);
       Method method = seatunnelEngineClass.getDeclaredMethod("main", 
String[].class);
       return (Integer) method.invoke(null, (Object) args);
diff --git 
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelFlinkSQLClient.java
 
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelFlinkV2Client.java
similarity index 84%
rename from 
linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelFlinkSQLClient.java
rename to 
linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelFlinkV2Client.java
index a9526b104..34300d584 100644
--- 
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelFlinkSQLClient.java
+++ 
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelFlinkV2Client.java
@@ -19,7 +19,7 @@ package org.apache.linkis.engineconnplugin.seatunnel.client;
 
 import org.apache.linkis.engineconn.computation.executor.utlis.JarLoader;
 
-import org.apache.seatunnel.core.sql.SeatunnelSql;
+import org.apache.seatunnel.core.starter.flink.SeaTunnelFlink;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
@@ -29,8 +29,8 @@ import java.lang.reflect.Method;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class LinkisSeatunnelFlinkSQLClient {
-  private static Logger logger = 
LoggerFactory.getLogger(LinkisSeatunnelFlinkSQLClient.class);
+public class LinkisSeatunnelFlinkV2Client {
+  private static Logger logger = 
LoggerFactory.getLogger(LinkisSeatunnelFlinkV2Client.class);
   private static Class<?> seatunnelEngineClass;
   private static JarLoader jarLoader;
 
@@ -39,16 +39,21 @@ public class LinkisSeatunnelFlinkSQLClient {
       jarLoader =
           new JarLoader(
               new String[] {
-                LinkisSeatunnelFlinkSQLClient.class
+                LinkisSeatunnelFlinkV2Client.class
                     .getProtectionDomain()
                     .getCodeSource()
                     .getLocation()
                     .getPath()
               });
       jarLoader.loadClass("org.apache.seatunnel.common.config.Common", false);
-      seatunnelEngineClass = 
jarLoader.loadClass("org.apache.seatunnel.core.sql.FlinkSqlStarter");
+      seatunnelEngineClass = 
jarLoader.loadClass("org.apache.seatunnel.core.flink.FlinkV2Starter");
       jarLoader.addJarURL(
-          
SeatunnelSql.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath());
+          SeaTunnelFlink.class
+              .getProtectionDomain()
+              .getCodeSource()
+              .getLocation()
+              .toURI()
+              .getPath());
       Thread.currentThread().setContextClassLoader(jarLoader);
       Method method = seatunnelEngineClass.getDeclaredMethod("main", 
String[].class);
       return (Integer) method.invoke(null, (Object) args);
diff --git 
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelSparkClient.java
 
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelSparkV2Client.java
similarity index 87%
rename from 
linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelSparkClient.java
rename to 
linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelSparkV2Client.java
index 7beacd4bd..8ec8661d4 100644
--- 
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelSparkClient.java
+++ 
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelSparkV2Client.java
@@ -19,7 +19,7 @@ package org.apache.linkis.engineconnplugin.seatunnel.client;
 
 import org.apache.linkis.engineconn.computation.executor.utlis.JarLoader;
 
-import org.apache.seatunnel.core.spark.SeatunnelSpark;
+import org.apache.seatunnel.core.starter.spark.SeaTunnelSpark;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
@@ -29,8 +29,8 @@ import java.lang.reflect.Method;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class LinkisSeatunnelSparkClient {
-  private static Logger logger = 
LoggerFactory.getLogger(LinkisSeatunnelSparkClient.class);
+public class LinkisSeatunnelSparkV2Client {
+  private static Logger logger = 
LoggerFactory.getLogger(LinkisSeatunnelSparkV2Client.class);
   private static Class<?> seatunnelEngineClass;
   private static JarLoader jarLoader;
 
@@ -39,7 +39,7 @@ public class LinkisSeatunnelSparkClient {
       jarLoader =
           new JarLoader(
               new String[] {
-                LinkisSeatunnelSparkClient.class
+                LinkisSeatunnelSparkV2Client.class
                     .getProtectionDomain()
                     .getCodeSource()
                     .getLocation()
@@ -47,10 +47,10 @@ public class LinkisSeatunnelSparkClient {
               });
       jarLoader.loadClass("org.apache.seatunnel.common.config.Common", false);
       
jarLoader.loadClass("org.apache.seatunnel.core.base.config.ConfigBuilder", 
false);
-      
jarLoader.loadClass("org.apache.seatunnel.core.base.config.PluginFactory", 
false);
-      seatunnelEngineClass = 
jarLoader.loadClass("org.apache.seatunnel.core.spark.SparkStarter");
+      //      
jarLoader.loadClass("org.apache.seatunnel.core.base.config.PluginFactory", 
false);
+      seatunnelEngineClass = 
jarLoader.loadClass("org.apache.seatunnel.core.spark.SparkV2Starter");
       jarLoader.addJarURL(
-          SeatunnelSpark.class
+          SeaTunnelSpark.class
               .getProtectionDomain()
               .getCodeSource()
               .getLocation()
diff --git 
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/errorcode/SeatunnelErrorCodeSummary.java
 
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/errorcode/SeatunnelErrorCodeSummary.java
index 059914997..66275c178 100644
--- 
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/errorcode/SeatunnelErrorCodeSummary.java
+++ 
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/errorcode/SeatunnelErrorCodeSummary.java
@@ -26,7 +26,7 @@ public enum SeatunnelErrorCodeSummary implements 
LinkisErrorCode {
       17023, "Not support method for requestExpectedResource.(不支持 
requestExpectedResource 的方法)"),
   EXEC_SPARK_CODE_ERROR(17023, "Exec Seatunnel-Spark Code Error(执行 
Seatunnel-Spark 代码错误)"),
   EXEC_FLINK_CODE_ERROR(17023, "Exec Seatunnel-Flink Code Error(执行 
Seatunnel-Flink 代码错误)"),
-  EXEC_FLINKSQL_CODE_ERROR(17023, "Exec Seatunnel-FlinkSQL Code Error(执行 
Seatunnel-FlinkSQL 代码错误)");
+  EXEC_SEATUNNEL_CODE_ERROR(17023, "Exec Seatunnel-Zeta Code Error(执行 
Seatunnel-Zeta 代码错误)");
 
   /** (errorCode)错误码 */
   private final int errorCode;
diff --git 
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/common/config/Common.java
 
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/common/config/Common.java
index 739af25de..04d10e412 100644
--- 
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/common/config/Common.java
+++ 
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/common/config/Common.java
@@ -17,64 +17,90 @@
 
 package org.apache.seatunnel.common.config;
 
+import 
org.apache.linkis.engineconnplugin.seatunnel.client.exception.JobExecutionException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Optional;
+import java.util.*;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.nio.file.FileVisitOption.FOLLOW_LINKS;
 
 public class Common {
+  public static final Log logger = LogFactory.getLog(Common.class.getName());
 
   private Common() {
     throw new IllegalStateException("Utility class");
   }
 
-  private static final List<String> ALLOWED_MODES =
-      
Arrays.stream(DeployMode.values()).map(DeployMode::getName).collect(Collectors.toList());
+  public static final int COLLECTION_SIZE = 16;
+
+  private static final int APP_LIB_DIR_DEPTH = 2;
+
+  private static final int PLUGIN_LIB_DIR_DEPTH = 3;
+
+  private static DeployMode MODE;
 
-  private static Optional<String> MODE = Optional.empty();
+  private static String SEATUNNEL_HOME;
 
-  public static boolean isModeAllowed(String mode) {
-    return ALLOWED_MODES.contains(mode.toLowerCase());
+  private static boolean STARTER = false;
+
+  public static void setDeployMode(DeployMode mode) {
+    MODE = mode;
   }
 
-  /** Set mode. return false in case of failure */
-  public static Boolean setDeployMode(String m) {
-    if (isModeAllowed(m)) {
-      MODE = Optional.of(m);
-      return true;
-    } else {
-      return false;
-    }
+  public static void setStarter(boolean inStarter) {
+    STARTER = inStarter;
   }
 
-  public static Optional<String> getDeployMode() {
+  public static DeployMode getDeployMode() {
     return MODE;
   }
 
-  /**
-   * Root dir varies between different spark master and deploy mode, it also 
varies between relative
-   * and absolute path. When running seatunnel in --master local, you can put 
plugins related files
-   * in $project_dir/plugins, then these files will be automatically copied to
-   * $project_dir/seatunnel-core/target and token in effect if you start 
seatunnel in IDE tools such
-   * as IDEA. When running seatunnel in --master yarn or --master mesos, you 
can put plugins related
-   * files in plugins dir.
-   */
+  public static Path appStarterDir() {
+    return appRootDir().resolve("starter");
+  }
+
+  private static String getSeaTunnelHome() {
+
+    if (StringUtils.isNotEmpty(SEATUNNEL_HOME)) {
+      return SEATUNNEL_HOME;
+    }
+    String seatunnelHome = System.getProperty("SEATUNNEL_HOME");
+    if (StringUtils.isBlank(seatunnelHome)) {
+      seatunnelHome = System.getenv("SEATUNNEL_HOME");
+    }
+    if (StringUtils.isBlank(seatunnelHome)) {
+      seatunnelHome = appRootDir().toString();
+    }
+    SEATUNNEL_HOME = seatunnelHome;
+    return SEATUNNEL_HOME;
+  }
+
   public static Path appRootDir() {
-    if (MODE.equals(Optional.of(DeployMode.CLIENT.getName()))) {
+    logger.info("Mode:" + MODE + ",Starter:" + STARTER);
+    if (DeployMode.CLIENT == MODE || DeployMode.RUN == MODE || STARTER) {
       try {
         String path = System.getProperty("SEATUNNEL_HOME") + "/seatunnel";
         path = new File(path).getPath();
+        logger.info("appRootDir:" + path);
         return Paths.get(path);
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
-    } else if (MODE.equals(Optional.of(DeployMode.CLUSTER.getName()))) {
+    } else if (DeployMode.CLUSTER == MODE || DeployMode.RUN_APPLICATION == 
MODE) {
       return Paths.get("");
     } else {
-      throw new IllegalStateException("MODE not support : " + 
MODE.orElse("null"));
+      throw new IllegalStateException("deploy mode not support : " + MODE);
     }
   }
 
@@ -97,7 +123,6 @@ public class Common {
     return Paths.get(appRootDir().toString(), "connectors", 
engine.toLowerCase());
   }
 
-  /** Plugin Connector Dir */
   public static Path connectorDir() {
     return Paths.get(appRootDir().toString(), "connectors");
   }
@@ -106,18 +131,47 @@ public class Common {
     return appRootDir().resolve("plugins.tar.gz");
   }
 
-  /** Get specific plugin dir */
-  public static Path pluginDir(String pluginName) {
-    return Paths.get(pluginRootDir().toString(), pluginName);
+  public static List<Path> getPluginsJarDependencies() {
+    Path pluginRootDir = Common.pluginRootDir();
+    if (!Files.exists(pluginRootDir) || !Files.isDirectory(pluginRootDir)) {
+      return Collections.emptyList();
+    }
+    try (Stream<Path> stream = Files.walk(pluginRootDir, PLUGIN_LIB_DIR_DEPTH, 
FOLLOW_LINKS)) {
+      return stream
+          .filter(it -> pluginRootDir.relativize(it).getNameCount() == 
PLUGIN_LIB_DIR_DEPTH)
+          .filter(it -> it.getParent().endsWith("lib"))
+          .filter(it -> it.getFileName().toString().endsWith(".jar"))
+          .collect(Collectors.toList());
+    } catch (IOException e) {
+      throw new JobExecutionException(e.getMessage(), e);
+    }
+  }
+
+  public static Path libDir() {
+    return appRootDir().resolve("lib");
   }
 
-  /** Get files dir of specific plugin */
-  public static Path pluginFilesDir(String pluginName) {
-    return Paths.get(pluginDir(pluginName).toString(), "files");
+  public static List<Path> getLibJars() {
+    Path libRootDir = Common.libDir();
+    if (!Files.exists(libRootDir) || !Files.isDirectory(libRootDir)) {
+      return Collections.emptyList();
+    }
+    try (Stream<Path> stream = Files.walk(libRootDir, APP_LIB_DIR_DEPTH, 
FOLLOW_LINKS)) {
+      return stream
+          .filter(it -> !it.toFile().isDirectory())
+          .filter(it -> it.getFileName().toString().endsWith(".jar"))
+          .collect(Collectors.toList());
+    } catch (IOException e) {
+      throw new JobExecutionException(e.getMessage(), e);
+    }
   }
 
-  /** Get lib dir of specific plugin */
-  public static Path pluginLibDir(String pluginName) {
-    return Paths.get(pluginDir(pluginName).toString(), "lib");
+  public static Set<Path> getThirdPartyJars(String paths) {
+    logger.info("getThirdPartyJars path:" + paths);
+    return Arrays.stream(paths.split(";"))
+        .filter(s -> !"".equals(s))
+        .filter(it -> it.endsWith(".jar"))
+        .map(path -> Paths.get(URI.create(path)))
+        .collect(Collectors.toSet());
   }
 }
diff --git 
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
 
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
index 1aa357966..7cd367b23 100644
--- 
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
+++ 
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
@@ -19,7 +19,6 @@ package org.apache.seatunnel.core.base.config;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.seatunnel.apis.base.env.RuntimeEnv;
 import org.apache.seatunnel.common.config.ConfigRuntimeException;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
@@ -28,22 +27,18 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
 
 import java.nio.file.Path;
 
-/**
- * Used to build the {@link Config} from file.
- *
- * @param <ENVIRONMENT> environment type.
- */
-public class ConfigBuilder<ENVIRONMENT extends RuntimeEnv> {
+public class ConfigBuilder {
 
   public static final Log LOGGER = 
LogFactory.getLog(ConfigBuilder.class.getName());
+
   private static final String PLUGIN_NAME_KEY = "plugin_name";
+
   private final Path configFile;
-  private final EngineType engine;
+
   private final Config config;
 
-  public ConfigBuilder(Path configFile, EngineType engine) {
+  public ConfigBuilder(Path configFile) {
     this.configFile = configFile;
-    this.engine = engine;
     this.config = load();
   }
 
@@ -72,15 +67,4 @@ public class ConfigBuilder<ENVIRONMENT extends RuntimeEnv> {
   public Config getConfig() {
     return config;
   }
-
-  /** check if config is valid. */
-  public void checkConfig() {
-    // check environment
-    ENVIRONMENT environment = new EnvironmentFactory<ENVIRONMENT>(config, 
engine).getEnvironment();
-    // check plugins
-    PluginFactory<ENVIRONMENT> pluginFactory = new PluginFactory<>(config, 
engine);
-    pluginFactory.createPlugins(PluginType.SOURCE);
-    pluginFactory.createPlugins(PluginType.TRANSFORM);
-    pluginFactory.createPlugins(PluginType.SINK);
-  }
 }
diff --git 
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/base/config/PluginFactory.java
 
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/base/config/PluginFactory.java
deleted file mode 100644
index 2070106e8..000000000
--- 
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/base/config/PluginFactory.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.base.config;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.seatunnel.apis.base.env.RuntimeEnv;
-import org.apache.seatunnel.apis.base.plugin.Plugin;
-import org.apache.seatunnel.common.config.Common;
-import org.apache.seatunnel.flink.BaseFlinkSink;
-import org.apache.seatunnel.flink.BaseFlinkSource;
-import org.apache.seatunnel.flink.BaseFlinkTransform;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
-import org.apache.seatunnel.spark.BaseSparkSink;
-import org.apache.seatunnel.spark.BaseSparkSource;
-import org.apache.seatunnel.spark.BaseSparkTransform;
-
-import javax.annotation.Nonnull;
-
-import java.io.File;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.*;
-import java.util.stream.Collectors;
-
-/**
- * Used to load the plugins.
- *
- * @param <ENVIRONMENT> environment
- */
-public class PluginFactory<ENVIRONMENT extends RuntimeEnv> {
-
-  public static final Log LOGGER = 
LogFactory.getLog(PluginFactory.class.getName());
-  private final Config config;
-  private final EngineType engineType;
-  private static final Map<EngineType, Map<PluginType, Class<?>>> 
PLUGIN_BASE_CLASS_MAP;
-
-  private static final String PLUGIN_NAME_KEY = "plugin_name";
-  private static final String PLUGIN_MAPPING_FILE = 
"plugin-mapping.properties";
-
-  private final List<URL> pluginJarPaths;
-  private final ClassLoader defaultClassLoader;
-
-  static {
-    PLUGIN_BASE_CLASS_MAP = new HashMap<>();
-    Map<PluginType, Class<?>> sparkBaseClassMap = new HashMap<>();
-    sparkBaseClassMap.put(PluginType.SOURCE, BaseSparkSource.class);
-    sparkBaseClassMap.put(PluginType.TRANSFORM, BaseSparkTransform.class);
-    sparkBaseClassMap.put(PluginType.SINK, BaseSparkSink.class);
-    PLUGIN_BASE_CLASS_MAP.put(EngineType.SPARK, sparkBaseClassMap);
-
-    Map<PluginType, Class<?>> flinkBaseClassMap = new HashMap<>();
-    flinkBaseClassMap.put(PluginType.SOURCE, BaseFlinkSource.class);
-    flinkBaseClassMap.put(PluginType.TRANSFORM, BaseFlinkTransform.class);
-    flinkBaseClassMap.put(PluginType.SINK, BaseFlinkSink.class);
-    PLUGIN_BASE_CLASS_MAP.put(EngineType.FLINK, flinkBaseClassMap);
-  }
-
-  public PluginFactory(Config config, EngineType engineType) {
-    this.config = config;
-    this.engineType = engineType;
-    this.pluginJarPaths = searchPluginJar();
-    this.defaultClassLoader = initClassLoaderWithPaths(this.pluginJarPaths);
-  }
-
-  private ClassLoader initClassLoaderWithPaths(List<URL> pluginJarPaths) {
-    return new URLClassLoader(
-        pluginJarPaths.toArray(new URL[0]), 
Thread.currentThread().getContextClassLoader());
-  }
-
-  @Nonnull
-  private List<URL> searchPluginJar() {
-
-    File pluginDir = 
Common.connectorJarDir(this.engineType.getEngine()).toFile();
-    if (!pluginDir.exists() || pluginDir.listFiles() == null) {
-      return new ArrayList<>();
-    }
-    Config pluginMapping =
-        ConfigFactory.parseFile(new File(getPluginMappingPath()))
-            .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
-            .resolveWith(
-                ConfigFactory.systemProperties(),
-                ConfigResolveOptions.defaults().setAllowUnresolved(true));
-    File[] plugins =
-        Arrays.stream(pluginDir.listFiles())
-            .filter(f -> f.getName().endsWith(".jar"))
-            .toArray(File[]::new);
-
-    return Arrays.stream(PluginType.values())
-        .filter(type -> !PluginType.TRANSFORM.equals(type))
-        .flatMap(
-            type -> {
-              List<URL> pluginList = new ArrayList<>();
-              List<? extends Config> configList = 
config.getConfigList(type.getType());
-              configList.forEach(
-                  pluginConfig -> {
-                    Optional<String> mappingValue =
-                        getPluginMappingValue(
-                            pluginMapping, type, 
pluginConfig.getString(PLUGIN_NAME_KEY));
-                    if (mappingValue.isPresent()) {
-                      try {
-                        for (File plugin : plugins) {
-                          if (plugin.getName().startsWith(mappingValue.get())) 
{
-                            pluginList.add(plugin.toURI().toURL());
-                            break;
-                          }
-                        }
-                      } catch (MalformedURLException e) {
-                        LOGGER.warn("can get plugin url", e);
-                      }
-                    } else {
-                      throw new IllegalArgumentException(
-                          String.format(
-                              "can't find connector %s in "
-                                  + "%s. If you add connector to connectors 
dictionary, please modify this "
-                                  + "file.",
-                              getPluginMappingKey(type, 
pluginConfig.getString(PLUGIN_NAME_KEY)),
-                              getPluginMappingPath()));
-                    }
-                  });
-              return pluginList.stream();
-            })
-        .collect(Collectors.toList());
-  }
-
-  public List<URL> getPluginJarPaths() {
-    return this.pluginJarPaths;
-  }
-
-  private String getPluginMappingPath() {
-    return Common.connectorDir() + "/" + PLUGIN_MAPPING_FILE;
-  }
-
-  private String getPluginMappingKey(PluginType type, String pluginName) {
-    return this.engineType.getEngine() + "." + type.getType() + "." + 
pluginName;
-  }
-
-  Optional<String> getPluginMappingValue(Config pluginMapping, PluginType 
type, String pluginName) {
-
-    return 
pluginMapping.getConfig(this.engineType.getEngine()).getConfig(type.getType()).entrySet()
-        .stream()
-        .filter(entry -> entry.getKey().equalsIgnoreCase(pluginName))
-        .map(entry -> entry.getValue().unwrapped().toString())
-        .findAny();
-  }
-
-  /**
-   * Create the plugins by plugin type.
-   *
-   * @param type plugin type
-   * @param <T> plugin
-   * @return plugin list.
-   */
-  @SuppressWarnings("unchecked")
-  public <T extends Plugin<ENVIRONMENT>> List<T> createPlugins(PluginType 
type) {
-    Objects.requireNonNull(type, "PluginType can not be null when create 
plugins!");
-    List<T> basePluginList = new ArrayList<>();
-    List<? extends Config> configList = config.getConfigList(type.getType());
-    configList.forEach(
-        plugin -> {
-          try {
-            T t =
-                (T)
-                    createPluginInstanceIgnoreCase(
-                        type, plugin.getString(PLUGIN_NAME_KEY), 
this.defaultClassLoader);
-            t.setConfig(plugin);
-            basePluginList.add(t);
-          } catch (Exception e) {
-            throw new RuntimeException(e);
-          }
-        });
-
-    return basePluginList;
-  }
-
-  /** create plugin class instance, ignore case. */
-  @SuppressWarnings("unchecked")
-  private Plugin<?> createPluginInstanceIgnoreCase(
-      PluginType pluginType, String pluginName, ClassLoader classLoader) 
throws Exception {
-    Class<Plugin<?>> pluginBaseClass =
-        (Class<Plugin<?>>) getPluginBaseClass(engineType, pluginType);
-
-    if (pluginName.split("\\.").length != 1) {
-      // canonical class name
-      Class<Plugin<?>> pluginClass = (Class<Plugin<?>>) 
Class.forName(pluginName);
-      if (pluginClass.isAssignableFrom(pluginBaseClass)) {
-        throw new IllegalArgumentException(
-            "plugin: " + pluginName + " is not extends from " + 
pluginBaseClass);
-      }
-      return pluginClass.getDeclaredConstructor().newInstance();
-    }
-    ServiceLoader<Plugin<?>> plugins = ServiceLoader.load(pluginBaseClass, 
classLoader);
-    for (Iterator<Plugin<?>> it = plugins.iterator(); it.hasNext(); ) {
-      try {
-        Plugin<?> plugin = it.next();
-        if (StringUtils.equalsIgnoreCase(plugin.getPluginName(), pluginName)) {
-          return plugin;
-        }
-      } catch (ServiceConfigurationError e) {
-        // Iterator.next() may throw ServiceConfigurationError,
-        // but maybe caused by a not used plugin in this job
-        LOGGER.warn("Error when load plugin:" + pluginName, e);
-      }
-    }
-    throw new ClassNotFoundException("Plugin class not found by name :[" + 
pluginName + "]");
-  }
-
-  private Class<?> getPluginBaseClass(EngineType engineType, PluginType 
pluginType) {
-    if (!PLUGIN_BASE_CLASS_MAP.containsKey(engineType)) {
-      throw new IllegalStateException("PluginType not support : [" + 
pluginType + "]");
-    }
-    Map<PluginType, Class<?>> pluginTypeClassMap = 
PLUGIN_BASE_CLASS_MAP.get(engineType);
-    if (!pluginTypeClassMap.containsKey(pluginType)) {
-      throw new IllegalStateException(pluginType + " is not supported in 
engine " + engineType);
-    }
-    return pluginTypeClassMap.get(pluginType);
-  }
-}
diff --git 
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
 
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
deleted file mode 100644
index ffa7af7bf..000000000
--- 
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.flink;
-
-import org.apache.linkis.engineconnplugin.seatunnel.util.SeatunnelUtils;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.seatunnel.common.config.Common;
-import org.apache.seatunnel.core.base.Starter;
-import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
-import org.apache.seatunnel.core.flink.config.FlinkJobType;
-import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
-
-import java.util.List;
-
-/**
- * The SeaTunnel flink starter. This class is responsible for generate the 
final flink job execute
- * command.
- */
-public class FlinkStarter implements Starter {
-  public static final Log logger = 
LogFactory.getLog(FlinkStarter.class.getName());
-  private static final String APP_NAME = SeatunnelFlink.class.getName();
-  private static final String APP_JAR_NAME = "seatunnel-core-flink.jar";
-
-  /** SeaTunnel parameters, used by SeaTunnel application. e.g. `-c 
config.conf` */
-  private final FlinkCommandArgs flinkCommandArgs;
-
-  /** SeaTunnel flink job jar. */
-  private final String appJar;
-
-  FlinkStarter(String[] args) {
-    this.flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, 
FlinkJobType.JAR);
-    // set the deployment mode, used to get the job jar path.
-    Common.setDeployMode(flinkCommandArgs.getDeployMode().getName());
-    this.appJar = Common.appLibDir().resolve(APP_JAR_NAME).toString();
-  }
-
-  @SuppressWarnings("checkstyle:RegexpSingleline")
-  public static int main(String[] args) {
-    logger.info("FlinkStarter start");
-    int exitCode = 0;
-    try {
-      FlinkStarter flinkStarter = new FlinkStarter(args);
-      String commandVal = String.join(" ", flinkStarter.buildCommands());
-      logger.info("commandVal:" + commandVal);
-      exitCode = SeatunnelUtils.executeLine(commandVal);
-    } catch (Exception e) {
-      exitCode = 1;
-      logger.error("\n\n该任务最可能的错误原因是:\n" + e);
-    }
-    return exitCode;
-  }
-
-  @Override
-  public List<String> buildCommands() {
-    return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, APP_NAME, 
appJar);
-  }
-}
diff --git 
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/flink/FlinkV2Starter.java
 
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/flink/FlinkV2Starter.java
new file mode 100644
index 000000000..dde6f6c3b
--- /dev/null
+++ 
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/flink/FlinkV2Starter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink;
+
+import org.apache.linkis.engineconnplugin.seatunnel.util.SeatunnelUtils;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.core.starter.Starter;
+import org.apache.seatunnel.core.starter.enums.EngineType;
+import org.apache.seatunnel.core.starter.flink.SeaTunnelFlink;
+import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+public class FlinkV2Starter implements Starter {
+  public static final Log logger = 
LogFactory.getLog(FlinkV2Starter.class.getName());
+  private static final String APP_NAME = SeaTunnelFlink.class.getName();
+  private static final String APP_JAR_NAME = 
EngineType.FLINK13.getStarterJarName();
+  private static final String SHELL_NAME = 
EngineType.FLINK13.getStarterShellName();
+
+  /** SeaTunnel parameters, used by SeaTunnel application. e.g. `-c 
config.conf` */
+  private final FlinkCommandArgs flinkCommandArgs;
+
+  /** SeaTunnel flink job jar. */
+  private final String appJar;
+
+  FlinkV2Starter(String[] args) {
+    this.flinkCommandArgs = CommandLineUtils.parse(args, new 
FlinkCommandArgs(), SHELL_NAME, true);
+    logger.info("this.flinkCommandArgs = " + this.flinkCommandArgs);
+    // set the deployment mode, used to get the job jar path.
+    Common.setDeployMode(flinkCommandArgs.getDeployMode());
+    Common.setStarter(true);
+    this.appJar = Common.appStarterDir().resolve(APP_JAR_NAME).toString();
+  }
+
+  @SuppressWarnings("checkstyle:RegexpSingleline")
+  public static int main(String[] args) {
+    logger.info("FlinkStarter start:" + Arrays.toString(args));
+    int exitCode = 0;
+    try {
+      FlinkV2Starter flinkStarter = new FlinkV2Starter(args);
+      String commandVal = String.join(" ", flinkStarter.buildCommands());
+      logger.info("FlinkV2Starter commandVal:" + commandVal);
+      exitCode = SeatunnelUtils.executeLine(commandVal);
+    } catch (Exception e) {
+      exitCode = 1;
+      logger.error("\n\nFlinkV2Starter error:\n" + e);
+    }
+    return exitCode;
+  }
+
+  @Override
+  public List<String> buildCommands() {
+    List<String> command = new ArrayList<>();
+    command.add("${FLINK_HOME}/bin/flink");
+    // set deploy mode, run or run-application
+    command.add(flinkCommandArgs.getDeployMode().getDeployMode());
+    // set submitted target master
+    if (flinkCommandArgs.getMasterType() != null) {
+      command.add("--target");
+      command.add(flinkCommandArgs.getMasterType().getMaster());
+    }
+    logger.info("FlinkV2Starter OriginalParameters:" + 
flinkCommandArgs.getOriginalParameters());
+    command.add("-c");
+    command.add(APP_NAME);
+    command.add(appJar);
+    command.add("--config");
+    command.add(flinkCommandArgs.getConfigFile());
+    command.add("--name");
+    command.add(flinkCommandArgs.getJobName());
+    // set System properties
+    flinkCommandArgs.getVariables().stream()
+        .filter(Objects::nonNull)
+        .map(String::trim)
+        .forEach(variable -> command.add("-D" + variable));
+    return command;
+  }
+}
diff --git 
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
 
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/spark/SparkV2Starter.java
similarity index 68%
rename from 
linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
rename to 
linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/spark/SparkV2Starter.java
index 60a0204c7..5f905731b 100644
--- 
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
+++ 
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/spark/SparkV2Starter.java
@@ -22,26 +22,34 @@ import 
org.apache.linkis.engineconnplugin.seatunnel.util.SeatunnelUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.seatunnel.apis.base.env.RuntimeEnv;
+import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.core.base.Starter;
-import org.apache.seatunnel.core.base.config.ConfigBuilder;
-import org.apache.seatunnel.core.base.config.ConfigParser;
-import org.apache.seatunnel.core.base.config.EngineType;
-import org.apache.seatunnel.core.base.config.PluginFactory;
-import org.apache.seatunnel.core.base.utils.CompressionUtils;
-import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.starter.Starter;
+import org.apache.seatunnel.core.starter.enums.EngineType;
+import org.apache.seatunnel.core.starter.enums.PluginType;
+import org.apache.seatunnel.core.starter.spark.SeaTunnelSpark;
+import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
+import org.apache.seatunnel.core.starter.utils.CompressionUtils;
+import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
+import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
 
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.*;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -50,8 +58,8 @@ import com.beust.jcommander.UnixStyleUsageFormatter;
 
 import static java.nio.file.FileVisitOption.FOLLOW_LINKS;
 
-public class SparkStarter implements Starter {
-  public static final Log logger = 
LogFactory.getLog(SparkStarter.class.getName());
+public class SparkV2Starter implements Starter {
+  public static final Log logger = 
LogFactory.getLog(SparkV2Starter.class.getName());
 
   private static final int USAGE_EXIT_CODE = 234;
 
@@ -75,7 +83,7 @@ public class SparkStarter implements Starter {
   /** spark configuration properties */
   protected Map<String, String> sparkConf;
 
-  private SparkStarter(String[] args, SparkCommandArgs commandArgs) {
+  private SparkV2Starter(String[] args, SparkCommandArgs commandArgs) {
     this.args = args;
     this.commandArgs = commandArgs;
   }
@@ -85,14 +93,14 @@ public class SparkStarter implements Starter {
     int exitCode = 0;
     logger.info("starter start");
     try {
-      SparkStarter starter = getInstance(args);
+      SparkV2Starter starter = getInstance(args);
       List<String> command = starter.buildCommands();
       String commandVal = String.join(" ", command);
-      logger.info("commandVal:" + commandVal);
+      logger.info("sparkV2starter commandVal:" + commandVal);
       exitCode = SeatunnelUtils.executeLine(commandVal);
     } catch (Exception e) {
       exitCode = 1;
-      logger.error("\n\n该任务最可能的错误原因是:\n" + e);
+      logger.error("\n\nsparkV2Starter error:\n" + e);
     }
     return exitCode;
   }
@@ -101,8 +109,10 @@ public class SparkStarter implements Starter {
    * method to get SparkStarter instance, will return {@link 
ClusterModeSparkStarter} or {@link
    * ClientModeSparkStarter} depending on deploy mode.
    */
-  static SparkStarter getInstance(String[] args) {
-    SparkCommandArgs commandArgs = parseCommandArgs(args);
+  static SparkV2Starter getInstance(String[] args) {
+    SparkCommandArgs commandArgs =
+        CommandLineUtils.parse(
+            args, new SparkCommandArgs(), 
EngineType.SPARK2.getStarterShellName(), true);
     DeployMode deployMode = commandArgs.getDeployMode();
     switch (deployMode) {
       case CLUSTER:
@@ -135,11 +145,15 @@ public class SparkStarter implements Starter {
   public List<String> buildCommands() throws IOException {
     setSparkConf();
     logger.info("setSparkConf start");
-    logger.info(commandArgs.getDeployMode().getName());
-    Common.setDeployMode(commandArgs.getDeployMode().getName());
-    this.jars.addAll(getPluginsJarDependencies());
-    this.jars.addAll(listJars(Common.appLibDir()));
+    logger.info(commandArgs.getDeployMode().toString());
+    Common.setDeployMode(commandArgs.getDeployMode());
+    Common.setStarter(true);
+    this.jars.addAll(Common.getPluginsJarDependencies());
+    this.jars.addAll(Common.getLibJars());
     this.jars.addAll(getConnectorJarDependencies());
+    this.jars.addAll(
+        new ArrayList<>(
+            
Common.getThirdPartyJars(sparkConf.getOrDefault(EnvCommonOptions.JARS.key(), 
""))));
     this.appName = this.sparkConf.getOrDefault("spark.app.name", 
Constants.LOGO);
     logger.info("buildFinal end");
     return buildFinal();
@@ -167,7 +181,19 @@ public class SparkStarter implements Starter {
 
   /** Get spark configurations from SeaTunnel job config file. */
   static Map<String, String> getSparkConf(String configFile) throws 
FileNotFoundException {
-    return ConfigParser.getConfigEnvValues(configFile);
+    File file = new File(configFile);
+    if (!file.exists()) {
+      throw new FileNotFoundException("config file '" + file + "' does not 
exists!");
+    }
+    Config appConfig =
+        ConfigFactory.parseFile(file)
+            .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
+            .resolveWith(
+                ConfigFactory.systemProperties(),
+                ConfigResolveOptions.defaults().setAllowUnresolved(true));
+
+    return appConfig.getConfig("env").entrySet().stream()
+        .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().unwrapped().toString()));
   }
 
   /** return plugin's dependent jars, which located in 
'plugins/${pluginName}/lib/*'. */
@@ -185,18 +211,45 @@ public class SparkStarter implements Starter {
     }
   }
 
-  /** return connector's jars, which located in 'connectors/spark/*'. */
+  /** return connector's jars, which located in 'connectors/spark/*'. 
2.3.0改为链接seatunnel中 */
   private List<Path> getConnectorJarDependencies() {
-    Path pluginRootDir = Common.connectorJarDir("SPARK");
+    Path pluginRootDir = Common.connectorJarDir("seatunnel");
     if (!Files.exists(pluginRootDir) || !Files.isDirectory(pluginRootDir)) {
       return Collections.emptyList();
     }
-    // Config config = 
ConfigFactory.parseFile(Paths.get(commandArgs.getConfigFile()).toFile());
-    Config config =
-        new ConfigBuilder<>(Paths.get(commandArgs.getConfigFile()), 
EngineType.SPARK).getConfig();
-    PluginFactory<RuntimeEnv> pluginFactory = new PluginFactory<>(config, 
EngineType.SPARK);
-    return pluginFactory.getPluginJarPaths().stream()
-        .map(url -> new File(url.getPath()).toPath())
+    Config config = ConfigBuilder.of(commandArgs.getConfigFile());
+    Set<URL> pluginJars = new HashSet<>();
+    SeaTunnelSourcePluginDiscovery seaTunnelSourcePluginDiscovery =
+        new SeaTunnelSourcePluginDiscovery();
+    SeaTunnelSinkPluginDiscovery seaTunnelSinkPluginDiscovery = new 
SeaTunnelSinkPluginDiscovery();
+    pluginJars.addAll(
+        seaTunnelSourcePluginDiscovery.getPluginJarPaths(
+            getPluginIdentifiers(config, PluginType.SOURCE)));
+    pluginJars.addAll(
+        seaTunnelSinkPluginDiscovery.getPluginJarPaths(
+            getPluginIdentifiers(config, PluginType.SINK)));
+    List<Path> connectPaths =
+        pluginJars.stream()
+            .map(url -> new File(url.getPath()).toPath())
+            .collect(Collectors.toList());
+    logger.info("getConnector jar paths:" + connectPaths.toString());
+    return connectPaths;
+  }
+
+  private List<PluginIdentifier> getPluginIdentifiers(Config config, 
PluginType... pluginTypes) {
+    return Arrays.stream(pluginTypes)
+        .flatMap(
+            (Function<PluginType, Stream<PluginIdentifier>>)
+                pluginType -> {
+                  List<? extends Config> configList = 
config.getConfigList(pluginType.getType());
+                  return configList.stream()
+                      .map(
+                          pluginConfig ->
+                              PluginIdentifier.of(
+                                  "seatunnel",
+                                  pluginType.getType(),
+                                  pluginConfig.getString("plugin_name")));
+                })
         .collect(Collectors.toList());
   }
 
@@ -213,16 +266,20 @@ public class SparkStarter implements Starter {
   /** build final spark-submit commands */
   protected List<String> buildFinal() {
     List<String> commands = new ArrayList<>();
-    commands.add("${SPARK_HOME}/bin/spark-submit");
-    appendOption(commands, "--class", SeatunnelSpark.class.getName());
+    commands.add(System.getenv("SPARK_HOME") + "/bin/spark-submit");
+    appendOption(commands, "--class", SeaTunnelSpark.class.getName());
     appendOption(commands, "--name", this.appName);
     appendOption(commands, "--master", this.commandArgs.getMaster());
-    appendOption(commands, "--deploy-mode", 
this.commandArgs.getDeployMode().getName());
+    appendOption(commands, "--deploy-mode", 
this.commandArgs.getDeployMode().getDeployMode());
     appendJars(commands, this.jars);
     appendFiles(commands, this.files);
     appendSparkConf(commands, this.sparkConf);
     appendAppJar(commands);
     appendArgs(commands, args);
+    if (this.commandArgs.isCheckConfig()) {
+      commands.add("--check");
+    }
+    logger.info("build command:" + commands);
     return commands;
   }
 
@@ -266,11 +323,15 @@ public class SparkStarter implements Starter {
 
   /** append appJar to StringBuilder */
   protected void appendAppJar(List<String> commands) {
-    
commands.add(Common.appLibDir().resolve("seatunnel-core-spark.jar").toString());
+    //    
commands.add(Common.appLibDir().resolve("seatunnel-spark-starter.jar").toString());
+    String appJarPath =
+        
Common.appStarterDir().resolve(EngineType.SPARK2.getStarterJarName()).toString();
+    logger.info("spark appJarPath:" + appJarPath);
+    commands.add(appJarPath);
   }
 
   /** a Starter for building spark-submit commands with client mode options */
-  private static class ClientModeSparkStarter extends SparkStarter {
+  private static class ClientModeSparkStarter extends SparkV2Starter {
 
     /** client mode specified spark options */
     private enum ClientModeSparkConfigs {
@@ -329,7 +390,7 @@ public class SparkStarter implements Starter {
   }
 
   /** a Starter for building spark-submit commands with cluster mode options */
-  private static class ClusterModeSparkStarter extends SparkStarter {
+  private static class ClusterModeSparkStarter extends SparkV2Starter {
 
     private ClusterModeSparkStarter(String[] args, SparkCommandArgs 
commandArgs) {
       super(args, commandArgs);
@@ -337,11 +398,10 @@ public class SparkStarter implements Starter {
 
     @Override
     public List<String> buildCommands() throws IOException {
-      Common.setDeployMode(commandArgs.getDeployMode().getName());
+      Common.setDeployMode(commandArgs.getDeployMode());
+      Common.setStarter(true);
       Path pluginTarball = Common.pluginTarball();
-      if (Files.notExists(pluginTarball)) {
-        CompressionUtils.tarGzip(Common.pluginRootDir(), pluginTarball);
-      }
+      CompressionUtils.tarGzip(Common.pluginRootDir(), pluginTarball);
       this.files.add(pluginTarball);
       this.files.add(Paths.get(commandArgs.getConfigFile()));
       return super.buildCommands();
diff --git 
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
 
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
deleted file mode 100644
index b7fd2fec6..000000000
--- 
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.sql;
-
-import org.apache.linkis.engineconnplugin.seatunnel.util.SeatunnelUtils;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.seatunnel.common.config.Common;
-import org.apache.seatunnel.core.base.Starter;
-import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
-import org.apache.seatunnel.core.flink.config.FlinkJobType;
-import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
-
-import java.util.List;
-
-public class FlinkSqlStarter implements Starter {
-  public static final Log logger = 
LogFactory.getLog(FlinkSqlStarter.class.getName());
-  private static final String APP_JAR_NAME = "seatunnel-core-flink-sql.jar";
-  private static final String CLASS_NAME = SeatunnelSql.class.getName();
-
-  private final FlinkCommandArgs flinkCommandArgs;
-  /** SeaTunnel flink sql job jar. */
-  private final String appJar;
-
-  FlinkSqlStarter(String[] args) {
-    this.flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, 
FlinkJobType.SQL);
-    // set the deployment mode, used to get the job jar path.
-    Common.setDeployMode(flinkCommandArgs.getDeployMode().getName());
-    this.appJar = Common.appLibDir().resolve(APP_JAR_NAME).toString();
-  }
-
-  @Override
-  public List<String> buildCommands() throws Exception {
-    return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, CLASS_NAME, 
appJar);
-  }
-
-  @SuppressWarnings("checkstyle:RegexpSingleline")
-  public static int main(String[] args) {
-    int exitCode = 0;
-    logger.info("FlinkSqlStarter start");
-    try {
-      FlinkSqlStarter flinkSqlStarter = new FlinkSqlStarter(args);
-      String commandVal = String.join(" ", flinkSqlStarter.buildCommands());
-      logger.info("commandVal:" + commandVal);
-      exitCode = SeatunnelUtils.executeLine(commandVal);
-    } catch (Exception e) {
-      exitCode = 1;
-      logger.error("\n\n该任务最可能的错误原因是:\n" + e);
-    }
-
-    return exitCode;
-  }
-}
diff --git 
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/zeta/ZetaStarter.java
 
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/zeta/ZetaStarter.java
new file mode 100644
index 000000000..59d90b9d9
--- /dev/null
+++ 
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/zeta/ZetaStarter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.zeta;
+
+import org.apache.linkis.engineconnplugin.seatunnel.util.SeatunnelUtils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.core.starter.Starter;
+import org.apache.seatunnel.core.starter.enums.EngineType;
+import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs;
+import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+public class ZetaStarter implements Starter {
+  public static final Log logger = 
LogFactory.getLog(ZetaStarter.class.getName());
+
+  private static final String APP_JAR_NAME = 
EngineType.SEATUNNEL.getStarterJarName();
+  private static final String SHELL_NAME = 
EngineType.SEATUNNEL.getStarterShellName();
+  private static final String namePrefix = "seaTunnel";
+  private final ClientCommandArgs commandArgs;
+  private final String appJar;
+
+  ZetaStarter(String[] args) {
+    this.commandArgs = CommandLineUtils.parse(args, new ClientCommandArgs(), 
SHELL_NAME, true);
+    logger.info("this.commandArgs = " + this.commandArgs);
+    // set the deployment mode, used to get the job jar path.
+    Common.setDeployMode(commandArgs.getDeployMode());
+    Common.setStarter(true);
+    this.appJar = Common.appStarterDir().resolve(APP_JAR_NAME).toString();
+  }
+
+  public static int main(String[] args) {
+    int exitCode = 0;
+    try {
+      logger.info("seaTunnel Zeta process..");
+      ZetaStarter zetaStarter = new ZetaStarter(args);
+      String commandVal = String.join(" ", zetaStarter.buildCommands());
+      logger.info("ZetaStarter commandVal:" + commandVal);
+      exitCode = SeatunnelUtils.executeLine(commandVal);
+    } catch (Exception e) {
+      exitCode = 1;
+      logger.error("\n\nZetaStarter error:\n" + e);
+    }
+    return exitCode;
+  }
+
+  @Override
+  public List<String> buildCommands() {
+    List<String> command = new ArrayList<>();
+    command.add("${SEATUNNEL_HOME}/bin/" + SHELL_NAME);
+    command.add("--master");
+    command.add(this.commandArgs.getMasterType().name());
+    command.add("--cluster");
+    command.add(
+        StringUtils.isNotBlank(this.commandArgs.getClusterName())
+            ? this.commandArgs.getClusterName()
+            : randomClusterName());
+    command.add("--config");
+    command.add(this.commandArgs.getConfigFile());
+    command.add("--name");
+    command.add(this.commandArgs.getJobName());
+    return command;
+  }
+
+  public String randomClusterName() {
+    Random random = new Random();
+    return namePrefix + "-" + random.nextInt(1000000);
+  }
+}
diff --git 
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/config/SeatunnelFlinkEnvConfiguration.scala
 
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/config/SeatunnelFlinkEnvConfiguration.scala
index bce3a0d38..5f42fa6aa 100644
--- 
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/config/SeatunnelFlinkEnvConfiguration.scala
+++ 
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/config/SeatunnelFlinkEnvConfiguration.scala
@@ -24,16 +24,23 @@ object SeatunnelFlinkEnvConfiguration {
   val LINKIS_FLINK_RUNMODE: CommonVars[String] =
     CommonVars[String]("linkis.flink.run.mode", "run-mode")
 
+  val LINKIS_FLINK_DEPLOY_MODE: CommonVars[String] =
+    CommonVars[String]("linkis.flink.delpoy.mode", "deploy-mode")
+
   val LINKIS_FLINK_CONFIG: CommonVars[String] =
     CommonVars[String]("linkis.flink.config", "config")
 
   val LINKIS_FLINK_VARIABLE: CommonVars[String] =
     CommonVars[String]("linkis.flink.variable", "variable")
 
+  val LINKIS_FLINK_MASTER: CommonVars[String] =
+    CommonVars[String]("linkis.flink.master", "master")
+
   val LINKIS_FLINK_CHECK: CommonVars[String] = 
CommonVars[String]("linkis.flink.check", "check")
 
-  val GET_LINKIS_FLINK_RUNMODE = "--" + LINKIS_FLINK_RUNMODE.getValue
+  val GET_LINKIS_FLINK_DEPLOY_MODE = "--" + LINKIS_FLINK_DEPLOY_MODE.getValue
   val GET_LINKIS_FLINK_CONFIG = "--" + LINKIS_FLINK_CONFIG.getValue
   val GET_LINKIS_FLINK_VARIABLE = "--" + LINKIS_FLINK_VARIABLE.getValue
   val GET_LINKIS_FLINK_CHECK = "--" + LINKIS_FLINK_CHECK.getValue
+  val GET_LINKIS_FLINK_MASTER = "--" + LINKIS_FLINK_MASTER.getValue
 }
diff --git 
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/config/SeatunnelFlinkEnvConfiguration.scala
 
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/config/SeatunnelZetaEnvConfiguration.scala
similarity index 52%
copy from 
linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/config/SeatunnelFlinkEnvConfiguration.scala
copy to 
linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/config/SeatunnelZetaEnvConfiguration.scala
index bce3a0d38..335f81d19 100644
--- 
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/config/SeatunnelFlinkEnvConfiguration.scala
+++ 
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/config/SeatunnelZetaEnvConfiguration.scala
@@ -19,21 +19,23 @@ package org.apache.linkis.engineconnplugin.seatunnel.config
 
 import org.apache.linkis.common.conf.CommonVars
 
-object SeatunnelFlinkEnvConfiguration {
+object SeatunnelZetaEnvConfiguration {
 
-  val LINKIS_FLINK_RUNMODE: CommonVars[String] =
-    CommonVars[String]("linkis.flink.run.mode", "run-mode")
+  val LINKIS_SEATUNNEL_MASTER: CommonVars[String] =
+    CommonVars[String]("linkis.seatunnel.master", "master")
 
-  val LINKIS_FLINK_CONFIG: CommonVars[String] =
-    CommonVars[String]("linkis.flink.config", "config")
+  val LINKIS_SEATUNNEL_VARIABLE: CommonVars[String] =
+    CommonVars[String]("linkis.seatunnel.variable", "variable")
 
-  val LINKIS_FLINK_VARIABLE: CommonVars[String] =
-    CommonVars[String]("linkis.flink.variable", "variable")
+  val LINKIS_SEATUNNEL_CONFIG: CommonVars[String] =
+    CommonVars[String]("linkis.seatunnel.config", "config")
 
-  val LINKIS_FLINK_CHECK: CommonVars[String] = 
CommonVars[String]("linkis.flink.check", "check")
+  val LINKIS_SEATUNNEL_CLUSTER_NAME: CommonVars[String] =
+    CommonVars[String]("linkis.seatunnel.cluster", "cluster")
+
+  val GET_LINKIS_SEATUNNEL_MASTER = "--" + LINKIS_SEATUNNEL_MASTER.getValue
+  val GET_LINKIS_SEATUNNEL_VARIABLE = "--" + LINKIS_SEATUNNEL_VARIABLE.getValue
+  val GET_LINKIS_SEATUNNEL_CONFIG = "--" + LINKIS_SEATUNNEL_CONFIG.getValue
+  val GET_LINKIS_SEATUNNEL_CLUSTER_NAME = "--" + 
LINKIS_SEATUNNEL_CLUSTER_NAME.getValue
 
-  val GET_LINKIS_FLINK_RUNMODE = "--" + LINKIS_FLINK_RUNMODE.getValue
-  val GET_LINKIS_FLINK_CONFIG = "--" + LINKIS_FLINK_CONFIG.getValue
-  val GET_LINKIS_FLINK_VARIABLE = "--" + LINKIS_FLINK_VARIABLE.getValue
-  val GET_LINKIS_FLINK_CHECK = "--" + LINKIS_FLINK_CHECK.getValue
 }
diff --git 
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkOnceCodeExecutor.scala
 
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkOnceCodeExecutor.scala
index 2a0109d12..b9adf97d9 100644
--- 
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkOnceCodeExecutor.scala
+++ 
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkOnceCodeExecutor.scala
@@ -25,18 +25,20 @@ import org.apache.linkis.engineconn.once.executor.{
   OnceExecutorExecutionContext,
   OperableOnceExecutor
 }
-import 
org.apache.linkis.engineconnplugin.seatunnel.client.LinkisSeatunnelFlinkClient
+import 
org.apache.linkis.engineconnplugin.seatunnel.client.LinkisSeatunnelFlinkV2Client
 import 
org.apache.linkis.engineconnplugin.seatunnel.client.errorcode.SeatunnelErrorCodeSummary.EXEC_FLINK_CODE_ERROR
 import 
org.apache.linkis.engineconnplugin.seatunnel.client.exception.JobExecutionException
 import 
org.apache.linkis.engineconnplugin.seatunnel.config.SeatunnelEnvConfiguration
 import 
org.apache.linkis.engineconnplugin.seatunnel.config.SeatunnelFlinkEnvConfiguration.{
   GET_LINKIS_FLINK_CHECK,
   GET_LINKIS_FLINK_CONFIG,
-  GET_LINKIS_FLINK_RUNMODE,
+  GET_LINKIS_FLINK_DEPLOY_MODE,
+  GET_LINKIS_FLINK_MASTER,
   GET_LINKIS_FLINK_VARIABLE,
   LINKIS_FLINK_CHECK,
   LINKIS_FLINK_CONFIG,
-  LINKIS_FLINK_RUNMODE,
+  LINKIS_FLINK_DEPLOY_MODE,
+  LINKIS_FLINK_MASTER,
   LINKIS_FLINK_VARIABLE
 }
 import 
org.apache.linkis.engineconnplugin.seatunnel.context.SeatunnelEngineConnContext
@@ -79,6 +81,7 @@ class SeatunnelFlinkOnceCodeExecutor(
     val code: String = options(TaskConstant.CODE)
     params = onceExecutorExecutionContext.getOnceExecutorContent.getJobContent
       .asInstanceOf[util.Map[String, String]]
+    logger.info("flink doSubmit args:" + params)
     future = Utils.defaultScheduler.submit(new Runnable {
       override def run(): Unit = {
         logger.info("Try to execute codes." + code)
@@ -106,21 +109,24 @@ class SeatunnelFlinkOnceCodeExecutor(
     logger.info("Execute SeatunnelFlink Process")
 
     var args: Array[String] = Array.empty
-    val flinkRunMode = LINKIS_FLINK_RUNMODE.getValue
+    val flinkRunMode = LINKIS_FLINK_DEPLOY_MODE.getValue
     if (params != null && params.containsKey(flinkRunMode)) {
       val config = LINKIS_FLINK_CONFIG.getValue
       val variable = LINKIS_FLINK_VARIABLE.getValue
       val check = LINKIS_FLINK_CHECK.getValue
+      val master = LINKIS_FLINK_MASTER.getValue
 
       args = Array(
-        GET_LINKIS_FLINK_RUNMODE,
+        GET_LINKIS_FLINK_DEPLOY_MODE,
         params.getOrDefault(flinkRunMode, "run"),
         GET_LINKIS_FLINK_CHECK,
         params.getOrDefault(check, "false"),
+        GET_LINKIS_FLINK_MASTER,
+        params.getOrDefault(master, "local"),
         GET_LINKIS_FLINK_CONFIG,
         generateExecFile(code)
       )
-
+      logger.info("runCode args:" + args.mkString("Array(", ", ", ")"))
       if (params.containsKey(variable)) {
         val variableMap = GSON.fromJson(params.get(variable), 
classOf[util.HashMap[String, String]])
         variableMap.asScala.foreach(f => {
@@ -130,6 +136,7 @@ class SeatunnelFlinkOnceCodeExecutor(
 
     } else {
       args = localArray(code)
+      logger.info("runCode no args:" + args.mkString("Array(", ", ", ")"))
     }
     System.setProperty("SEATUNNEL_HOME", 
System.getenv(ENGINE_CONN_LOCAL_PATH_PWD_KEY.getValue))
     Files.createSymbolicLink(
@@ -137,7 +144,7 @@ class SeatunnelFlinkOnceCodeExecutor(
       new File(SeatunnelEnvConfiguration.SEATUNNEL_HOME.getValue).toPath
     )
     logger.info(s"Execute SeatunnelFlink Process end args:${args.mkString(" 
")}")
-    LinkisSeatunnelFlinkClient.main(args)
+    LinkisSeatunnelFlinkV2Client.main(args)
   }
 
   override protected def waitToRunning(): Unit = {
diff --git 
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelSparkOnceCodeExecutor.scala
 
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelSparkOnceCodeExecutor.scala
index b0cf6ece7..d938db412 100644
--- 
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelSparkOnceCodeExecutor.scala
+++ 
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelSparkOnceCodeExecutor.scala
@@ -24,7 +24,7 @@ import org.apache.linkis.engineconn.once.executor.{
   OnceExecutorExecutionContext,
   OperableOnceExecutor
 }
-import 
org.apache.linkis.engineconnplugin.seatunnel.client.LinkisSeatunnelSparkClient
+import 
org.apache.linkis.engineconnplugin.seatunnel.client.LinkisSeatunnelSparkV2Client
 import 
org.apache.linkis.engineconnplugin.seatunnel.client.errorcode.SeatunnelErrorCodeSummary.EXEC_SPARK_CODE_ERROR
 import 
org.apache.linkis.engineconnplugin.seatunnel.client.exception.JobExecutionException
 import 
org.apache.linkis.engineconnplugin.seatunnel.config.SeatunnelEnvConfiguration
@@ -116,7 +116,7 @@ class SeatunnelSparkOnceCodeExecutor(
       new File(SeatunnelEnvConfiguration.SEATUNNEL_HOME.getValue).toPath
     )
     logger.info(s"Execute SeatunnelSpark Process end args:${args.mkString(" 
")}")
-    LinkisSeatunnelSparkClient.main(args)
+    LinkisSeatunnelSparkV2Client.main(args)
   }
 
   override protected def waitToRunning(): Unit = {
diff --git 
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkSQLOnceCodeExecutor.scala
 
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelZetaOnceCodeExecutor.scala
similarity index 78%
rename from 
linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkSQLOnceCodeExecutor.scala
rename to 
linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelZetaOnceCodeExecutor.scala
index 784ec92a4..e4fd89198 100644
--- 
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkSQLOnceCodeExecutor.scala
+++ 
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelZetaOnceCodeExecutor.scala
@@ -25,31 +25,17 @@ import org.apache.linkis.engineconn.once.executor.{
   OnceExecutorExecutionContext,
   OperableOnceExecutor
 }
-import 
org.apache.linkis.engineconnplugin.seatunnel.client.LinkisSeatunnelFlinkSQLClient
-import 
org.apache.linkis.engineconnplugin.seatunnel.client.errorcode.SeatunnelErrorCodeSummary.EXEC_FLINKSQL_CODE_ERROR
+import 
org.apache.linkis.engineconnplugin.seatunnel.client.LinkSeatunnelZetaClient
+import 
org.apache.linkis.engineconnplugin.seatunnel.client.errorcode.SeatunnelErrorCodeSummary.EXEC_SEATUNNEL_CODE_ERROR
 import 
org.apache.linkis.engineconnplugin.seatunnel.client.exception.JobExecutionException
 import 
org.apache.linkis.engineconnplugin.seatunnel.config.SeatunnelEnvConfiguration
-import 
org.apache.linkis.engineconnplugin.seatunnel.config.SeatunnelFlinkEnvConfiguration.{
-  GET_LINKIS_FLINK_CHECK,
-  GET_LINKIS_FLINK_CONFIG,
-  GET_LINKIS_FLINK_RUNMODE,
-  GET_LINKIS_FLINK_VARIABLE,
-  LINKIS_FLINK_CHECK,
-  LINKIS_FLINK_CONFIG,
-  LINKIS_FLINK_RUNMODE,
-  LINKIS_FLINK_VARIABLE
-}
+import 
org.apache.linkis.engineconnplugin.seatunnel.config.SeatunnelZetaEnvConfiguration._
 import 
org.apache.linkis.engineconnplugin.seatunnel.context.SeatunnelEngineConnContext
 import org.apache.linkis.engineconnplugin.seatunnel.util.SeatunnelUtils.{
   generateExecFile,
   localArray
 }
-import org.apache.linkis.manager.common.entity.resource.{
-  CommonNodeResource,
-  LoadInstanceResource,
-  NodeResource
-}
-import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
+import org.apache.linkis.manager.common.entity.resource.{CommonNodeResource, 
NodeResource}
 import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
 import org.apache.linkis.protocol.constants.TaskConstant
 import org.apache.linkis.protocol.engine.JobProgressInfo
@@ -62,7 +48,7 @@ import java.util.concurrent.{Future, TimeUnit}
 
 import scala.collection.JavaConverters._
 
-class SeatunnelFlinkSQLOnceCodeExecutor(
+class SeatunnelZetaOnceCodeExecutor(
     override val id: Long,
     override protected val seatunnelEngineConnContext: 
SeatunnelEngineConnContext
 ) extends SeatunnelOnceExecutor
@@ -87,7 +73,7 @@ class SeatunnelFlinkSQLOnceCodeExecutor(
           setResponse(
             ErrorExecuteResponse(
               "Run code failed!",
-              new JobExecutionException(EXEC_FLINKSQL_CODE_ERROR.getErrorDesc)
+              new JobExecutionException(EXEC_SEATUNNEL_CODE_ERROR.getErrorDesc)
             )
           )
           tryFailed()
@@ -103,28 +89,27 @@ class SeatunnelFlinkSQLOnceCodeExecutor(
   }
 
   protected def runCode(code: String): Int = {
-    logger.info("Execute SeatunnelFlink Process")
+    logger.info("Execute SeaTunnelZeta Process")
 
     var args: Array[String] = Array.empty
-    val flinkRunMode = LINKIS_FLINK_RUNMODE.getValue
-    if (params != null && params.containsKey(flinkRunMode)) {
-      val config = LINKIS_FLINK_CONFIG.getValue
-      val variable = LINKIS_FLINK_VARIABLE.getValue
-      val check = LINKIS_FLINK_CHECK.getValue
-
+    if (params != null) {
+      val config = LINKIS_SEATUNNEL_CONFIG.getValue
+      val variable = LINKIS_SEATUNNEL_VARIABLE.getValue
+      val masterKey = LINKIS_SEATUNNEL_MASTER.getValue
+      val clusterName = LINKIS_SEATUNNEL_CLUSTER_NAME.getValue
       args = Array(
-        GET_LINKIS_FLINK_RUNMODE,
-        params.getOrDefault(flinkRunMode, "run"),
-        GET_LINKIS_FLINK_CHECK,
-        params.getOrDefault(check, "false"),
-        GET_LINKIS_FLINK_CONFIG,
+        GET_LINKIS_SEATUNNEL_MASTER,
+        params.getOrDefault(masterKey, "cluster"),
+        GET_LINKIS_SEATUNNEL_CLUSTER_NAME,
+        params.getOrDefault(clusterName, "linkis_seatunnel_cluster"),
+        GET_LINKIS_SEATUNNEL_CONFIG,
         generateExecFile(code)
       )
 
       if (params.containsKey(variable)) {
         val variableMap = GSON.fromJson(params.get(variable), 
classOf[util.HashMap[String, String]])
         variableMap.asScala.foreach(f => {
-          args ++ Array(GET_LINKIS_FLINK_VARIABLE, s"${f._1}=${f._2}")
+          args ++ Array(GET_LINKIS_SEATUNNEL_VARIABLE, s"${f._1}=${f._2}")
         })
       }
 
@@ -136,8 +121,8 @@ class SeatunnelFlinkSQLOnceCodeExecutor(
       new File(System.getenv(ENGINE_CONN_LOCAL_PATH_PWD_KEY.getValue) + 
"/seatunnel").toPath,
       new File(SeatunnelEnvConfiguration.SEATUNNEL_HOME.getValue).toPath
     )
-    logger.info(s"Execute SeatunnelFlinkSQL Process end args:${args.mkString(" 
")}")
-    LinkisSeatunnelFlinkSQLClient.main(args)
+    logger.info(s"Execute SeatunnelZeta Process end args:${args.mkString(" 
")}")
+    LinkSeatunnelZetaClient.main(args)
   }
 
   override protected def waitToRunning(): Unit = {
@@ -146,7 +131,7 @@ class SeatunnelFlinkSQLOnceCodeExecutor(
         new Runnable {
           override def run(): Unit = {
             if (!(future.isDone || future.isCancelled)) {
-              logger.info("The SeatunnelFlinkSQL Process In Running")
+              logger.info("The Seatunnel Zeta Process In Running")
             }
           }
         },
diff --git 
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/factory/SeatunnelEngineConnFactory.scala
 
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/factory/SeatunnelEngineConnFactory.scala
index 66bcdd9db..a71944a51 100644
--- 
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/factory/SeatunnelEngineConnFactory.scala
+++ 
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/factory/SeatunnelEngineConnFactory.scala
@@ -36,11 +36,12 @@ class SeatunnelEngineConnFactory extends 
MultiExecutorEngineConnFactory with Log
 
   override protected def getEngineConnType: EngineType = EngineType.SEATUNNEL
 
-  private val executorFactoryArray = Array[ExecutorFactory](
-    new SeatunnelSparkExecutorFactory,
-    new SeatunnelFlinkSQLExecutorFactory,
-    new SeatunnelFlinkExecutorFactory
-  )
+  private val executorFactoryArray =
+    Array[ExecutorFactory](
+      new SeatunnelSparkExecutorFactory,
+      new SeatunnelFlinkExecutorFactory,
+      new SeatunnelZetaExecutorFactory
+    )
 
   override protected def createEngineConnSession(
       engineCreationContext: EngineCreationContext
diff --git 
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/factory/SeatunnelFlinkSQLExecutorFactory.scala
 
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/factory/SeatunnelZetaExecutorFactory.scala
similarity index 91%
rename from 
linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/factory/SeatunnelFlinkSQLExecutorFactory.scala
rename to 
linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/factory/SeatunnelZetaExecutorFactory.scala
index e5258f3d0..82aa36d4d 100644
--- 
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/factory/SeatunnelFlinkSQLExecutorFactory.scala
+++ 
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/factory/SeatunnelZetaExecutorFactory.scala
@@ -22,14 +22,12 @@ import 
org.apache.linkis.engineconn.common.engineconn.EngineConn
 import org.apache.linkis.engineconn.once.executor.OnceExecutor
 import org.apache.linkis.engineconn.once.executor.creation.OnceExecutorFactory
 import 
org.apache.linkis.engineconnplugin.seatunnel.context.SeatunnelEngineConnContext
-import 
org.apache.linkis.engineconnplugin.seatunnel.executor.SeatunnelFlinkSQLOnceCodeExecutor
+import 
org.apache.linkis.engineconnplugin.seatunnel.executor.SeatunnelZetaOnceCodeExecutor
 import org.apache.linkis.manager.label.entity.Label
 import org.apache.linkis.manager.label.entity.engine.RunType
 import org.apache.linkis.manager.label.entity.engine.RunType.RunType
 
-class SeatunnelFlinkSQLExecutorFactory extends OnceExecutorFactory {
-
-  override protected def getRunType: RunType = RunType.SEATUNNEL_FLINK_SQL
+class SeatunnelZetaExecutorFactory extends OnceExecutorFactory {
 
   override protected def newExecutor(
       id: Int,
@@ -39,8 +37,9 @@ class SeatunnelFlinkSQLExecutorFactory extends 
OnceExecutorFactory {
   ): OnceExecutor = {
     engineConn.getEngineConnSession match {
       case context: SeatunnelEngineConnContext =>
-        new SeatunnelFlinkSQLOnceCodeExecutor(id, context)
+        new SeatunnelZetaOnceCodeExecutor(id, context)
     }
   }
 
+  override protected def getRunType: RunType = RunType.SEATUNNEL_ZETA
 }
diff --git 
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/util/SeatunnelUtils.scala
 
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/util/SeatunnelUtils.scala
index 8999e93a4..3296babac 100644
--- 
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/util/SeatunnelUtils.scala
+++ 
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/util/SeatunnelUtils.scala
@@ -24,6 +24,7 @@ import org.apache.commons.io.IOUtils
 import org.apache.commons.logging.{Log, LogFactory}
 
 import java.io.{BufferedReader, File, InputStreamReader, PrintWriter}
+import java.lang.ProcessBuilder.Redirect
 
 object SeatunnelUtils {
   val LOGGER: Log = LogFactory.getLog(SeatunnelUtils.getClass)
@@ -48,14 +49,14 @@ object SeatunnelUtils {
     var bufferedReader: BufferedReader = null
     try {
       val processBuilder: ProcessBuilder = new 
ProcessBuilder(generateRunCode(code): _*)
+      val file = new File(
+        System.getenv(ENGINE_CONN_LOCAL_PATH_PWD_KEY.getValue) + 
"/logs/yarnApp.log"
+      )
+      processBuilder.redirectErrorStream(true)
+      processBuilder.redirectOutput(Redirect.appendTo(file))
+      LOGGER.info("process ready start.")
       process = processBuilder.start()
-      bufferedReader = new BufferedReader(new 
InputStreamReader(process.getInputStream))
-      var line: String = null
-      while ({
-        line = bufferedReader.readLine(); line != null
-      }) {
-        LOGGER.info(line)
-      }
+      LOGGER.info(s"process start: $code")
       val exitcode = process.waitFor()
       exitcode
     } finally {
diff --git 
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/pom.xml
 
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/pom.xml
index 7c4da6950..1449c4855 100644
--- 
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/pom.xml
+++ 
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/pom.xml
@@ -28,7 +28,7 @@
 
   <properties>
     <postgresql.version>42.3.8</postgresql.version>
-    <clickhouse.version>0.3.2-patch11</clickhouse.version>
+    <clickhouse.version>0.4.6</clickhouse.version>
   </properties>
 
   <dependencies>
diff --git a/tool/dependencies/known-dependencies.txt 
b/tool/dependencies/known-dependencies.txt
index 493b44629..705082a96 100644
--- a/tool/dependencies/known-dependencies.txt
+++ b/tool/dependencies/known-dependencies.txt
@@ -56,7 +56,7 @@ chill-java-0.7.6.jar
 chill_2.12-0.7.6.jar
 classgraph-4.1.7.jar
 classmate-1.5.1.jar
-clickhouse-jdbc-0.3.2-patch11.jar
+clickhouse-jdbc-0.4.6.jar
 commons-beanutils-1.9.4.jar
 commons-cli-1.3.1.jar
 commons-codec-1.10.jar
@@ -180,6 +180,7 @@ hadoop-yarn-api-3.3.4.jar
 hadoop-yarn-client-3.3.4.jar
 hadoop-yarn-common-3.3.4.jar
 hadoop-yarn-registry-3.3.4.jar
+hazelcast-5.1.jar
 hibernate-validator-5.1.2.Final.jar
 hibernate-validator-6.1.7.Final.jar
 hive-classification-3.1.3.jar
@@ -476,9 +477,21 @@ scala-reflect-2.12.17.jar
 scala-xml_2.12-2.1.0.jar
 scalap-2.12.17.jar
 scopt_2.12-3.5.0.jar
-seatunnel-core-flink-2.1.2.jar
-seatunnel-core-flink-sql-2.1.2.jar
-seatunnel-core-spark-2.1.2.jar
+jackson-dataformat-properties-2.13.4.jar
+jcl-over-slf4j-1.7.30.jar
+jcommander-1.81.jar
+seatunnel-api-2.3.1.jar
+seatunnel-common-2.3.1.jar
+seatunnel-config-base-2.3.1.jar
+seatunnel-config-shade-2.3.1.jar
+seatunnel-core-starter-2.3.1.jar
+seatunnel-engine-common-2.3.1.jar
+seatunnel-engine-core-2.3.1.jar
+seatunnel-flink-13-starter-2.3.1.jar
+seatunnel-jackson-2.3.1-optional.jar
+seatunnel-plugin-discovery-2.3.1.jar
+seatunnel-spark-2-starter-2.3.1.jar
+seatunnel-starter-2.3.1.jar
 security-0.191.jar
 security-0.193.jar
 servo-core-0.12.21.jar


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to