This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
     new 6c9316942 [WIP][Improve] custom-code job support dependencies (#3291)
6c9316942 is described below

commit 6c9316942682c7b56a4e15aef01548df207d6904
Author: benjobs <[email protected]>
AuthorDate: Thu Oct 26 18:45:57 2023 +0800

    [WIP][Improve] custom-code job support dependencies (#3291)
    
    * [Improve] custom-code job support dependencies
    
    * [Improve] build fatjar improvement
    
    * [improve] extractArguments improvement
    ---------
    
    Co-authored-by: benjobs <[email protected]>
---
 .../streampark/common/util/PropertiesUtils.scala   | 43 +++++++++++++-
 .../common/util/PropertiesUtilsTestCase.scala      | 66 ++--------------------
 .../main/assembly/script/upgrade/mysql/2.1.1.sql   | 26 +++++++++
 .../main/assembly/script/upgrade/pgsql/2.1.1.sql   | 19 +++++++
 .../console/core/entity/Application.java           | 19 ++-----
 .../core/service/impl/AppBuildPipeServiceImpl.java | 36 +++++-------
 .../core/service/impl/ApplicationServiceImpl.java  | 33 ++++++++++-
 .../src/main/resources/db/data-h2.sql              |  2 +-
 .../src/main/resources/db/schema-h2.sql            |  1 +
 .../resources/mapper/core/ApplicationMapper.xml    |  1 +
 .../Application/src/AppDarkModeToggle.vue          |  4 +-
 .../src/components/ContextMenu/src/ContextMenu.vue |  4 +-
 .../src/components/Form/src/BasicForm.vue          |  2 +-
 .../src/components/Page/src/PageFooter.vue         |  4 +-
 .../src/hooks/web/useLockPage.ts                   |  9 +--
 .../streampark-console-webapp/src/utils/props.ts   |  2 +-
 .../src/views/flink/app/Add.vue                    | 25 ++++----
 .../src/views/flink/app/EditFlink.vue              | 30 +++++++++-
 .../src/views/flink/app/EditStreamPark.vue         |  2 +-
 .../flink/app/hooks/useCreateAndEditSchema.ts      | 16 ++++--
 .../src/views/flink/app/hooks/useCreateSchema.ts   | 12 +---
 .../views/flink/app/hooks/useEditFlinkSchema.ts    |  8 +++
 .../src/views/flink/app/hooks/useFlinkRender.tsx   |  2 +-
 .../flink/client/trait/FlinkClientTrait.scala      | 59 +------------------
 .../streampark/flink/packer/maven/MavenTool.scala  | 43 +++++++++-----
 .../flink/packer/pipeline/BuildRequest.scala       |  1 -
 .../pipeline/impl/FlinkRemoteBuildPipeline.scala   | 35 +++++-------
 27 files changed, 265 insertions(+), 239 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index 544314801..2272cc5cf 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -30,7 +30,7 @@ import java.util.regex.Pattern
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 import scala.collection.mutable
-import scala.collection.mutable.{Map => MutableMap}
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
 
 object PropertiesUtils extends Logger {
 
@@ -305,6 +305,47 @@ object PropertiesUtils extends Logger {
     }
   }
 
+  @Nonnull def extractArguments(args: String): List[String] = {
+    val programArgs = new ArrayBuffer[String]()
+    if (StringUtils.isNotEmpty(args)) {
+      val array = args.split("\\s+")
+      val iter = array.iterator
+      def join(s: String, v: String): Unit = {
+        if (v.startsWith(s)) {
+          if (v.endsWith(s)) {
+            programArgs += v.replaceAll(s"^$s|$s$$", "")
+          } else {
+            var value = v
+            while (!value.endsWith(s) && iter.hasNext) {
+              value += s" ${iter.next()}"
+            }
+            programArgs += value.replaceAll(s"^$s|$s$$", "")
+          }
+        }
+      }
+
+      while (iter.hasNext) {
+        val v = iter.next()
+        if (v.startsWith("'")) {
+          if (v.endsWith("'")) {
+            programArgs += v.replaceAll("^'|'$", "")
+          } else {
+            join("'", v)
+          }
+        } else if (v.startsWith("\"")) {
+          if (v.endsWith("\"")) {
+            programArgs += v.replaceAll("^\"|\"$", "")
+          } else {
+            join("\"", v)
+          }
+        } else {
+          programArgs += v
+        }
+      }
+    }
+    programArgs.toList
+  }
+
   @Nonnull def extractDynamicPropertiesAsJava(properties: String): 
JavaMap[String, String] =
     new JavaMap[String, String](extractDynamicProperties(properties).asJava)
 
diff --git 
a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
 
b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
index e04e0e3be..995cfd2ec 100644
--- 
a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
+++ 
b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
@@ -16,75 +16,19 @@
  */
 package org.apache.streampark.common.util
 
-import org.apache.commons.lang3.StringUtils
 import org.junit.jupiter.api.{Assertions, Test}
 
 import scala.collection.mutable.ArrayBuffer
+import scala.language.postfixOps
 
 class PropertiesUtilsTestCase {
 
   @Test def testExtractProgramArgs(): Unit = {
-    val argsStr = "--host localhost:8123\n\n\n" +
-      "--sql \"\"\"insert into table_a select * from table_b\"\"\"\n" +
-      "--c d\r\n" +
-      "--x yyy"
+    val args =
+      "mysql-sync-database \n--database employees \n--mysql-conf 
hostname=127.0.0.1 \n--mysql-conf port=3306 \n--mysql-conf username=root 
\n--mysql-conf password=123456 \n--mysql-conf database-name=employees 
\n--including-tables 'test|test.*' \n--sink-conf fenodes=127.0.0.1:8030 
\n--sink-conf username=root \n--sink-conf password= \n--sink-conf 
jdbc-url=jdbc:mysql://127.0.0.1:9030 \n--sink-conf 
sink.label-prefix=label\n--table-conf replication_num=1 "
     val programArgs = new ArrayBuffer[String]()
-    if (StringUtils.isNotEmpty(argsStr)) {
-      val multiLineChar = "\"\"\""
-      val array = argsStr.split("\\s+")
-      if (array.filter(_.startsWith(multiLineChar)).isEmpty) {
-        array.foreach(programArgs +=)
-      } else {
-        val argsArray = new ArrayBuffer[String]()
-        val tempBuffer = new ArrayBuffer[String]()
-
-        def processElement(index: Int, multiLine: Boolean): Unit = {
-
-          if (index == array.length) {
-            if (tempBuffer.nonEmpty) {
-              argsArray += tempBuffer.mkString(" ")
-            }
-            return
-          }
-
-          val next = index + 1
-          val elem = array(index)
-
-          if (elem.trim.nonEmpty) {
-            if (!multiLine) {
-              if (elem.startsWith(multiLineChar)) {
-                tempBuffer += elem.drop(3)
-                processElement(next, true)
-              } else {
-                argsArray += elem
-                processElement(next, false)
-              }
-            } else {
-              if (elem.endsWith(multiLineChar)) {
-                tempBuffer += elem.dropRight(3)
-                argsArray += tempBuffer.mkString(" ")
-                tempBuffer.clear()
-                processElement(next, false)
-              } else {
-                tempBuffer += elem
-                processElement(next, multiLine)
-              }
-            }
-          } else {
-            tempBuffer += elem
-            processElement(next, false)
-          }
-        }
-
-        processElement(0, false)
-        argsArray.foreach(x => programArgs += x.trim)
-      }
-    }
-
-    Assertions.assertEquals("localhost:8123", programArgs(1))
-    Assertions.assertEquals("insert into table_a select * from table_b", 
programArgs(3))
-    Assertions.assertEquals("d", programArgs(5))
-    Assertions.assertEquals("yyy", programArgs(7))
+    programArgs ++= PropertiesUtils.extractArguments(args)
+    println(programArgs)
   }
 
   @Test def testDynamicProperties(): Unit = {
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.1.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.1.sql
new file mode 100644
index 000000000..2205a8fbb
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.1.sql
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use streampark;
+
+set names utf8mb4;
+set foreign_key_checks = 0;
+
+alter table `t_flink_app`
+    add column `dependency` text collate utf8mb4_general_ci default null after 
`main_class`;
+
+set foreign_key_checks = 1;
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.1.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.1.sql
new file mode 100644
index 000000000..a87816bf4
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.1.sql
@@ -0,0 +1,19 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+alter table "public"."t_flink_app"
+    add column "dependency" text collate "pg_catalog"."default";
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 81f4ccc4c..568f7f2a5 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -160,6 +160,9 @@ public class Application implements Serializable {
 
   private String mainClass;
 
+  @TableField(updateStrategy = FieldStrategy.IGNORED)
+  private String dependency;
+
   private Date startTime;
 
   @TableField(updateStrategy = FieldStrategy.IGNORED)
@@ -220,7 +223,6 @@ public class Application implements Serializable {
   /** running job */
   private transient JobsOverview.Task overview;
 
-  private transient String dependency;
   private transient Long sqlId;
   private transient String flinkSql;
 
@@ -390,15 +392,6 @@ public class Application implements Serializable {
         && this.cpFailureAction != null;
   }
 
-  public boolean eqFlinkJob(Application other) {
-    if (this.isFlinkSqlJob() && other.isFlinkSqlJob()) {
-      if (this.getFlinkSql().trim().equals(other.getFlinkSql().trim())) {
-        return this.getDependencyObject().eq(other.getDependencyObject());
-      }
-    }
-    return false;
-  }
-
   /** Local compilation and packaging working directory */
   @JsonIgnore
   public String getDistHome() {
@@ -752,11 +745,11 @@ public class Application implements Serializable {
 
     @Override
     public String toString() {
-      return groupId + ":" + artifactId + ":" + version + getClassifier(":");
+      return groupId + ":" + artifactId + ":" + version + getClassifier();
     }
 
-    private String getClassifier(String joiner) {
-      return StringUtils.isEmpty(classifier) ? "" : joiner + classifier;
+    private String getClassifier() {
+      return StringUtils.isEmpty(classifier) ? "" : ":" + classifier;
     }
   }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 6f8890bd2..f6fb7e641 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -184,32 +184,24 @@ public class AppBuildPipeServiceImpl
 
             if (app.isCustomCodeJob()) {
               // customCode upload jar to appHome...
-              String appHome = app.getAppHome();
               FsOperator fsOperator = app.getFsOperator();
-              fsOperator.delete(appHome);
-              if (app.isUploadJob()) {
+              if (app.isCICDJob()) {
+                String appHome = app.getAppHome();
+                fsOperator.mkCleanDirs(appHome);
+                fsOperator.upload(app.getDistHome(), appHome);
+              } else {
                 File localJar = new File(WebUtils.getAppTempDir(), 
app.getJar());
                 // upload jar copy to appHome
                 String uploadJar = appUploads.concat("/").concat(app.getJar());
                 checkOrElseUploadJar(app.getFsOperator(), localJar, uploadJar, 
appUploads);
-                switch (app.getApplicationType()) {
-                  case STREAMPARK_FLINK:
-                    fsOperator.mkdirs(app.getAppLib());
-                    fsOperator.copy(uploadJar, app.getAppLib(), false, true);
-                    break;
-                  case APACHE_FLINK:
-                    fsOperator.mkdirs(appHome);
-                    fsOperator.copy(uploadJar, appHome, false, true);
-                    break;
-                  default:
-                    throw new IllegalArgumentException(
-                        "[StreamPark] unsupported ApplicationType of custom 
code: "
-                            + app.getApplicationType());
+                if (app.getApplicationType() == 
ApplicationType.STREAMPARK_FLINK) {
+                  fsOperator.mkdirs(app.getAppLib());
+                  fsOperator.copy(uploadJar, app.getAppLib(), false, true);
                 }
-              } else {
-                fsOperator.upload(app.getDistHome(), appHome);
               }
-            } else {
+            }
+
+            if (app.isFlinkSqlJob() || app.isUploadJob()) {
               if (!app.getDependencyObject().getJar().isEmpty()) {
                 String localUploads = Workspace.local().APP_UPLOADS();
                 // copy jar to local upload dir
@@ -335,7 +327,8 @@ public class AppBuildPipeServiceImpl
     FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(app.getVersionId());
     String flinkUserJar = retrieveFlinkUserJar(flinkEnv, app);
     ExecutionMode executionMode = app.getExecutionModeEnum();
-    String mainClass = ConfigConst.STREAMPARK_FLINKSQL_CLIENT_CLASS();
+    String mainClass =
+        app.isCustomCodeJob() ? app.getMainClass() : 
ConfigConst.STREAMPARK_FLINKSQL_CLIENT_CLASS();
     switch (executionMode) {
       case YARN_APPLICATION:
         String yarnProvidedPath = app.getAppLib();
@@ -364,7 +357,6 @@ public class AppBuildPipeServiceImpl
                 app.getLocalAppHome(),
                 mainClass,
                 flinkUserJar,
-                app.isCustomCodeJob(),
                 app.getExecutionModeEnum(),
                 app.getDevelopmentMode(),
                 flinkEnv.getFlinkVersion(),
@@ -424,7 +416,7 @@ public class AppBuildPipeServiceImpl
           case STREAMPARK_FLINK:
             return String.format("%s/%s", app.getAppLib(), 
app.getModule().concat(".jar"));
           case APACHE_FLINK:
-            return String.format("%s/%s", app.getAppHome(), app.getJar());
+            return String.format("%s/%s", WebUtils.getAppTempDir(), 
app.getJar());
           default:
             throw new IllegalArgumentException(
                 "[StreamPark] unsupported ApplicationType of custom code: "
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 2242b897e..e23c12564 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -703,6 +703,17 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
   public boolean create(Application appParam) {
     ApiAlertException.throwIfNull(
         appParam.getTeamId(), "The teamId can't be null. Create application 
failed.");
+
+    if (appParam.isFlinkSqlJob()) {
+      appParam.setBuild(true);
+    } else {
+      if (appParam.isUploadJob()) {
+        appParam.setBuild(!appParam.getDependencyObject().isEmpty());
+      } else {
+        appParam.setBuild(false);
+      }
+    }
+
     appParam.setUserId(commonService.getUserId());
     appParam.setState(FlinkAppState.ADDED.getValue());
     appParam.setRelease(ReleaseState.NEED_RELEASE.get());
@@ -736,6 +747,17 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     }
   }
 
+  @Override
+  public boolean save(Application entity) {
+    String dependency = entity.getDependency();
+    if (entity.isFlinkSqlJob()) {
+      entity.setDependency(null);
+    }
+    boolean flag = super.save(entity);
+    entity.setDependency(dependency);
+    return flag;
+  }
+
   private boolean existsByJobName(String jobName) {
     return this.baseMapper.existsByJobName(jobName);
   }
@@ -836,8 +858,17 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
         String.format(ERROR_APP_QUEUE_HINT, appParam.getYarnQueue(), 
appParam.getTeamId()));
 
     application.setRelease(ReleaseState.NEED_RELEASE.get());
+
     if (application.isUploadJob()) {
-      if (!ObjectUtils.safeEquals(application.getJar(), appParam.getJar())) {
+      Application.Dependency thisDependency =
+          Application.Dependency.toDependency(appParam.getDependency());
+      Application.Dependency targetDependency =
+          Application.Dependency.toDependency(application.getDependency());
+
+      if (!thisDependency.eq(targetDependency)) {
+        application.setDependency(appParam.getDependency());
+        application.setBuild(true);
+      } else if (!ObjectUtils.safeEquals(application.getJar(), 
appParam.getJar())) {
         application.setBuild(true);
       } else {
         File jarFile = new File(WebUtils.getAppTempDir(), appParam.getJar());
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
 
b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
index 77494579a..7a450f129 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
+++ 
b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
@@ -25,7 +25,7 @@ insert into `t_team` values (100001, 'test', 'The test team', 
now(), now());
 -- ----------------------------
 -- Records of t_flink_app
 -- ----------------------------
-insert into `t_flink_app` values (100000, 100000, 2, 4, null, null, 'Flink SQL 
Demo', null, null, null, null, null, null , null, 100000, null, 1, null, null, 
null, null, null, null, null, '0', 0, null, null, null, null, null, null, 
'Flink SQL Demo', 0, null, null, null, null, null, null, null, 0, 0, now(), 
now(), null, 1, 1, null, null, null, null, null, null, 0, null, null, null, 
'streampark,test');
+insert into `t_flink_app` values (100000, 100000, 2, 4, null, null, 'Flink SQL 
Demo', null, null, null, null, null, null, null , null, 100000, null, 1, null, 
null, null, null, null, null, null, '0', 0, null, null, null, null, null, null, 
'Flink SQL Demo', 0, null, null, null, null, null, null, null, 0, 0, now(), 
now(), null, 1, 1, null, null, null, null, null, null, 0, null, null, null, 
'streampark,test');
 
 -- ----------------------------
 -- Records of t_flink_effective
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
 
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index 3f7025a83..d06f907e0 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++ 
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -46,6 +46,7 @@ create table if not exists `t_flink_app` (
   `jar` varchar(255)  default null,
   `jar_check_sum` bigint default null,
   `main_class` varchar(255)  default null,
+  `dependency` text ,
   `args` text,
   `options` text,
   `hot_params` text ,
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
index a6c97121c..9096158b5 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
+++ 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
@@ -40,6 +40,7 @@
         <result column="tracking" jdbcType="INTEGER" property="tracking"/>
         <result column="jar" jdbcType="VARCHAR" property="jar"/>
         <result column="jar_check_sum" jdbcType="VARCHAR" 
property="jarCheckSum"/>
+        <result column="dependency" jdbcType="LONGVARCHAR" 
property="dependency"/>
         <result column="main_class" jdbcType="VARCHAR" property="mainClass"/>
         <result column="job_id" jdbcType="VARCHAR" property="jobId"/>
         <result column="job_manager_url" jdbcType="VARCHAR" 
property="jobManagerUrl"/>
diff --git 
a/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue
 
b/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue
index abf6c47ea..40b5c4964 100644
--- 
a/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue
+++ 
b/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue
@@ -79,9 +79,7 @@
       height: 18px;
       background-color: #fff;
       border-radius: 50%;
-      transition:
-        transform 0.5s,
-        background-color 0.5s;
+      transition: transform 0.5s, background-color 0.5s;
       will-change: transform;
     }
 
diff --git 
a/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue
 
b/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue
index 61cc99b7f..122b6e711 100644
--- 
a/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue
+++ 
b/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue
@@ -195,9 +195,7 @@
     background-color: @component-background;
     border: 1px solid rgb(0 0 0 / 8%);
     border-radius: 0.25rem;
-    box-shadow:
-      0 2px 2px 0 rgb(0 0 0 / 14%),
-      0 3px 1px -2px rgb(0 0 0 / 10%),
+    box-shadow: 0 2px 2px 0 rgb(0 0 0 / 14%), 0 3px 1px -2px rgb(0 0 0 / 10%),
       0 1px 5px 0 rgb(0 0 0 / 6%);
     background-clip: padding-box;
     user-select: none;
diff --git 
a/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue
 
b/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue
index ab4c10638..0169b4c86 100644
--- 
a/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue
+++ 
b/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue
@@ -129,7 +129,7 @@
       });
 
       const getBindValue = computed(
-        () => ({ ...attrs, ...props, ...unref(getProps) }) as Recordable,
+        () => ({ ...attrs, ...props, ...unref(getProps) } as Recordable),
       );
 
       const getSchema = computed((): FormSchema[] => {
diff --git 
a/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue
 
b/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue
index c499ccf65..153302646 100644
--- 
a/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue
+++ 
b/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue
@@ -55,9 +55,7 @@
     line-height: 44px;
     background-color: @component-background;
     border-top: 1px solid @border-color-base;
-    box-shadow:
-      0 -6px 16px -8px rgb(0 0 0 / 8%),
-      0 -9px 28px 0 rgb(0 0 0 / 5%),
+    box-shadow: 0 -6px 16px -8px rgb(0 0 0 / 8%), 0 -9px 28px 0 rgb(0 0 0 / 
5%),
       0 -12px 48px 16px rgb(0 0 0 / 3%);
     transition: width 0.2s;
 
diff --git 
a/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts 
b/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts
index 47aa6eb5f..de0380811 100644
--- a/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts
+++ b/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts
@@ -48,12 +48,9 @@ export function useLockPage() {
     }
     clear();
 
-    timeId = setTimeout(
-      () => {
-        lockPage();
-      },
-      lockTime * 60 * 1000,
-    );
+    timeId = setTimeout(() => {
+      lockPage();
+    }, lockTime * 60 * 1000);
   }
 
   function lockPage(): void {
diff --git a/streampark-console/streampark-console-webapp/src/utils/props.ts 
b/streampark-console/streampark-console-webapp/src/utils/props.ts
index 5828c8cf7..92fe2810b 100644
--- a/streampark-console/streampark-console-webapp/src/utils/props.ts
+++ b/streampark-console/streampark-console-webapp/src/utils/props.ts
@@ -191,7 +191,7 @@ export const buildProps = <
       : never;
   };
 
-export const definePropType = <T>(val: any) => ({ [wrapperKey]: val }) as 
PropWrapper<T>;
+export const definePropType = <T>(val: any) => ({ [wrapperKey]: val } as 
PropWrapper<T>);
 
 export const keyOf = <T extends Object>(arr: T) => Object.keys(arr) as 
Array<keyof T>;
 export const mutable = <T extends readonly any[] | Record<string, 
unknown>>(val: T) =>
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue 
b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
index 541503f69..071231e0c 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
@@ -185,7 +185,7 @@
     // common params...
     const resourceFrom = values.resourceFrom;
     if (resourceFrom) {
-      if (resourceFrom === 'csv') {
+      if (resourceFrom === 'cvs') {
         params['resourceFrom'] = ResourceFromEnum.CICD;
         //streampark flink
         if (values.appType == AppTypeEnum.STREAMPARK_FLINK) {
@@ -210,15 +210,15 @@
           appType: AppTypeEnum.APACHE_FLINK,
           jar: unref(uploadJar),
           mainClass: values.mainClass,
+          dependency: await getDependency(),
         });
         handleCreateApp(params);
       }
     }
   }
-  /* flink sql mode */
-  async function handleSubmitSQL(values: Recordable) {
+  async function getDependency() {
     // Trigger a pom confirmation operation.
-    await unref(dependencyRef)?.handleApplyPom();
+    unref(dependencyRef)?.handleApplyPom();
     // common params...
     const dependency: { pom?: string; jar?: string } = {};
     const dependencyRecords = unref(dependencyRef)?.dependencyRecords;
@@ -233,14 +233,18 @@
         jar: unref(uploadJars),
       });
     }
-
+    return dependency.pom === undefined && dependency.jar === undefined
+      ? null
+      : JSON.stringify(dependency);
+  }
+  /* flink sql mode */
+  async function handleSubmitSQL(values: Recordable) {
     let config = values.configOverride;
-    if (config != null && config !== undefined && config.trim() != '') {
+    if (config != null && config.trim() != '') {
       config = encryptByBase64(config);
     } else {
       config = null;
     }
-
     handleCluster(values);
     const params = {
       jobType: JobTypeEnum.SQL,
@@ -248,10 +252,7 @@
       appType: AppTypeEnum.STREAMPARK_FLINK,
       config,
       format: values.isSetConfig ? 1 : null,
-      dependency:
-        dependency.pom === undefined && dependency.jar === undefined
-          ? null
-          : JSON.stringify(dependency),
+      dependency: await getDependency(),
     };
     handleSubmitParams(params, values, k8sTemplate);
     handleCreateApp(params);
@@ -285,7 +286,7 @@
     const param = {};
     for (const k in params) {
       const v = params[k];
-      if (v != null && v !== undefined) {
+      if (v != null) {
         param[k] = v;
       }
     }
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
index 021354020..e75e0d544 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
@@ -39,6 +39,7 @@
   import VariableReview from './components/VariableReview.vue';
   import { useDrawer } from '/@/components/Drawer';
   import { ExecModeEnum, ResourceFromEnum } from '/@/enums/flinkEnum';
+  import Dependency from '/@/views/flink/app/components/Dependency.vue';
 
   const route = useRoute();
   const { t } = useI18n();
@@ -52,6 +53,7 @@
   const uploadJar = ref('');
   const programArgRef = ref();
   const podTemplateRef = ref();
+  const dependencyRef = ref();
 
   const k8sTemplate = reactive({
     podTemplate: '',
@@ -116,6 +118,7 @@
       setFieldsValue(defaultParams);
       app.args && programArgRef.value?.setContent(app.args);
       setTimeout(() => {
+        unref(dependencyRef)?.setDefaultValue(JSON.parse(app.dependency || 
'{}'));
         unref(podTemplateRef)?.handleChoicePodTemplate('ptVisual', 
app.k8sPodTemplate);
         unref(podTemplateRef)?.handleChoicePodTemplate('jmPtVisual', 
app.k8sJmPodTemplate);
         unref(podTemplateRef)?.handleChoicePodTemplate('tmPtVisual', 
app.k8sTmPodTemplate);
@@ -142,15 +145,34 @@
 
   /* Handling update parameters */
   function handleAppUpdate(values: Recordable) {
+    // Trigger a pom confirmation operation.
+    unref(dependencyRef)?.handleApplyPom();
+    // common params...
+    const dependency: { pom?: string; jar?: string } = {};
+    const dependencyRecords = unref(dependencyRef)?.dependencyRecords;
+    const uploadJars = unref(dependencyRef)?.uploadJars;
+    if (unref(dependencyRecords) && unref(dependencyRecords).length > 0) {
+      Object.assign(dependency, {
+        pom: unref(dependencyRecords),
+      });
+    }
+    if (uploadJars && unref(uploadJars).length > 0) {
+      Object.assign(dependency, {
+        jar: unref(uploadJars),
+      });
+    }
     submitLoading.value = true;
     try {
       const params = {
         id: app.id,
         jar: values.jar,
         mainClass: values.mainClass,
+        dependency:
+          dependency.pom === undefined && dependency.jar === undefined
+            ? null
+            : JSON.stringify(dependency),
       };
       handleSubmitParams(params, values, k8sTemplate);
-
       handleUpdateApp(params);
     } catch (error) {
       submitLoading.value = false;
@@ -212,13 +234,17 @@
       <template #args="{ model }">
         <ProgramArgs
           ref="programArgRef"
-          v-if="model.args != null && model.args != undefined"
+          v-if="model.args != null"
           v-model:value="model.args"
           :suggestions="suggestions"
           @preview="(value) => openReviewDrawer(true, { value, suggestions })"
         />
       </template>
 
+      <template #dependency="{ model, field }">
+        <Dependency ref="dependencyRef" v-model:value="model[field]" 
:form-model="model" />
+      </template>
+
       <template #formFooter>
         <div class="flex items-center w-full justify-center">
           <a-button @click="go('/flink/app')">
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
index 0d346aa10..9e307ea24 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
@@ -360,7 +360,7 @@
       <template #args="{ model }">
         <ProgramArgs
           ref="programArgRef"
-          v-if="model.args != null && model.args != undefined"
+          v-if="model.args != null"
           v-model:value="model.args"
           :suggestions="suggestions"
           @preview="(value) => openReviewDrawer(true, { value, suggestions })"
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
index a5cad757c..752ad30e0 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
@@ -49,7 +49,13 @@ import { fetchFlinkEnv } from 
'/@/api/flink/setting/flinkEnv';
 import { FlinkEnv } from '/@/api/flink/setting/types/flinkEnv.type';
 import { AlertSetting } from '/@/api/flink/setting/types/alert.type';
 import { FlinkCluster } from '/@/api/flink/setting/types/flinkCluster.type';
-import { AppTypeEnum, ClusterStateEnum, ExecModeEnum, JobTypeEnum } from 
'/@/enums/flinkEnum';
+import {
+  AppTypeEnum,
+  ClusterStateEnum,
+  ExecModeEnum,
+  JobTypeEnum,
+  ResourceFromEnum,
+} from '/@/enums/flinkEnum';
 import { isK8sExecMode } from '../utils';
 import { useI18n } from '/@/hooks/web/useI18n';
 import { fetchCheckHadoop } from '/@/api/flink/setting';
@@ -79,7 +85,7 @@ export const useCreateAndEditSchema = (
 
   const [registerConfDrawer, { openDrawer: openConfDrawer }] = useDrawer();
 
-  /* 
+  /*
   !The original item is also unassigned
   */
   function getConfigSchemas() {
@@ -126,9 +132,11 @@ export const useCreateAndEditSchema = (
         slot: 'dependency',
         ifShow: ({ values }) => {
           if (edit?.appId) {
-            return values.jobType == JobTypeEnum.SQL;
+            return values.jobType == JobTypeEnum.SQL
+              ? true
+              : values.resourceFrom == ResourceFromEnum.UPLOAD;
           } else {
-            return values?.jobType == 'sql';
+            return values?.jobType == 'sql' ? true : values?.resourceFrom != 
'cvs';
           }
         },
       },
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts
index 72b11762f..4ef2557b3 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts
@@ -63,14 +63,6 @@ export const useCreateSchema = (dependencyRef: Ref) => {
     suggestions,
   } = useCreateAndEditSchema(dependencyRef);
 
-  // async function handleEditConfig(config: string) {
-  //   console.log('config', config);
-  //   const res = await fetchAppConf({ config });
-  //   const conf = decodeByBase64(res);
-  //   openConfDrawer(true, {
-  //     configOverride: conf,
-  //   });
-  // }
   function handleCheckConfig(_rule: RuleObject, value: StoreValue) {
     if (value) {
       const confType = getAppConfType(value);
@@ -97,7 +89,7 @@ export const useCreateSchema = (dependencyRef: Ref) => {
               if (value === 'sql') {
                 formModel.tableEnv = 1;
               } else {
-                formModel.resourceFrom = 'csv';
+                formModel.resourceFrom = 'cvs';
               }
             },
           };
@@ -109,7 +101,6 @@ export const useCreateSchema = (dependencyRef: Ref) => {
       },
       ...getExecutionModeSchema.value,
       ...getFlinkClusterSchemas.value,
-      ...getFlinkSqlSchema.value,
       {
         field: 'resourceFrom',
         label: t('flink.app.resourceFrom'),
@@ -133,6 +124,7 @@ export const useCreateSchema = (dependencyRef: Ref) => {
         ifShow: ({ values }) => values?.jobType !== 'sql' && 
values?.resourceFrom == 'upload',
         rules: [{ required: true, message: 
t('flink.app.addAppTips.mainClassIsRequiredMessage') }],
       },
+      ...getFlinkSqlSchema.value,
       {
         field: 'project',
         label: t('flink.app.project'),
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useEditFlinkSchema.ts
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useEditFlinkSchema.ts
index aef6f6f81..c7e072ad8 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useEditFlinkSchema.ts
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useEditFlinkSchema.ts
@@ -23,6 +23,8 @@ import { Alert } from 'ant-design-vue';
 import { useRoute } from 'vue-router';
 import { fetchMain } from '/@/api/flink/app/app';
 import { ResourceFromEnum } from '/@/enums/flinkEnum';
+import { useI18n } from '/@/hooks/web/useI18n';
+const { t } = useI18n();
 
 export const useEditFlinkSchema = (jars: Ref) => {
   const flinkSql = ref();
@@ -115,6 +117,12 @@ export const useEditFlinkSchema = (jars: Ref) => {
         },
         rules: [{ required: true, message: 'Program Main is required' }],
       },
+      {
+        field: 'dependency',
+        label: t('flink.app.dependency'),
+        component: 'Input',
+        slot: 'dependency',
+      },
       ...getFlinkFormOtherSchemas.value,
     ];
   });
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
index 4a103f81b..ba00e667a 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
@@ -514,7 +514,7 @@ export const renderResourceFrom = (model: Recordable) => {
       value={model.resourceFrom}
       placeholder="Please select resource from"
     >
-      <Select.Option value="csv">
+      <Select.Option value="cvs">
         <SvgIcon name="github" />
         <span class="pl-10px">CICD</span>
         <span class="gray">(build from CVS)</span>
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index aa91aed32..e7da5525e 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -20,7 +20,7 @@ package org.apache.streampark.flink.client.`trait`
 import org.apache.streampark.common.conf.ConfigConst._
 import org.apache.streampark.common.conf.Workspace
 import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode, 
ExecutionMode}
-import org.apache.streampark.common.util.{DeflaterUtils, Logger}
+import org.apache.streampark.common.util.{DeflaterUtils, Logger, 
PropertiesUtils}
 import org.apache.streampark.flink.client.bean._
 import org.apache.streampark.flink.core.FlinkClusterClient
 import org.apache.streampark.flink.core.conf.FlinkRunOption
@@ -42,7 +42,6 @@ import org.apache.flink.util.Preconditions.checkNotNull
 import java.io.File
 import java.util.{Collections, List => JavaList, Map => JavaMap}
 
-import scala.annotation.tailrec
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -399,63 +398,9 @@ trait FlinkClientTrait extends Logger {
 
   private[this] def extractProgramArgs(submitRequest: SubmitRequest): 
JavaList[String] = {
     val programArgs = new ArrayBuffer[String]()
-    val args = submitRequest.args
-
-    if (StringUtils.isNotEmpty(args)) {
-      val multiChar = "\""
-      val array = args.split("\\s+")
-      if (!array.exists(_.startsWith(multiChar))) {
-        array.foreach(programArgs +=)
-      } else {
-        val argsArray = new ArrayBuffer[String]()
-        val tempBuffer = new ArrayBuffer[String]()
-
-        @tailrec
-        def processElement(index: Int, multi: Boolean): Unit = {
-
-          if (index == array.length) {
-            if (tempBuffer.nonEmpty) {
-              argsArray += tempBuffer.mkString(" ")
-            }
-            return
-          }
-
-          val next = index + 1
-          val elem = array(index).trim
-
-          if (elem.isEmpty) {
-            processElement(next, multi = false)
-          } else {
-            if (multi) {
-              if (elem.endsWith(multiChar)) {
-                tempBuffer += elem.dropRight(1)
-                argsArray += tempBuffer.mkString(" ")
-                tempBuffer.clear()
-                processElement(next, multi = false)
-              } else {
-                tempBuffer += elem
-                processElement(next, multi)
-              }
-            } else {
-              val until = if (elem.endsWith(multiChar)) 1 else 0
-              if (elem.startsWith(multiChar)) {
-                tempBuffer += elem.drop(1).dropRight(until)
-                processElement(next, multi = true)
-              } else {
-                argsArray += elem.dropRight(until)
-                processElement(next, multi = false)
-              }
-            }
-          }
-        }
-
-        processElement(0, multi = false)
-        argsArray.foreach(x => programArgs += x)
-      }
-    }
+    programArgs ++= PropertiesUtils.extractArguments(submitRequest.args)
 
     if (submitRequest.applicationType == ApplicationType.STREAMPARK_FLINK) {
-
       programArgs += PARAM_KEY_FLINK_CONF += submitRequest.flinkYaml
       programArgs += PARAM_KEY_APP_NAME += 
DeflaterUtils.zipString(submitRequest.effectiveAppName)
       programArgs += PARAM_KEY_FLINK_PARALLELISM += 
getParallelism(submitRequest).toString
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
index 9c95748dd..5dab91892 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
@@ -46,7 +46,7 @@ import java.util
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
 
 object MavenTool extends Logger {
 
@@ -76,9 +76,6 @@ object MavenTool extends Logger {
     List(remoteRepository)
   }
 
-  private val isJarFile = (file: File) =>
-    file.isFile && Try(Utils.checkJarFile(file.toURI.toURL)).isSuccess
-
   /**
    * Build a fat-jar with custom jar libraries.
    *
@@ -102,16 +99,34 @@ object MavenTool extends Logger {
     uberJar.delete()
     // resolve all jarLibs
     val jarSet = new util.HashSet[File]
-    jarLibs
-      .map(lib => new File(lib))
-      .filter(_.exists)
-      .foreach {
-        case libFile if isJarFile(libFile) => jarSet.add(libFile)
-        case libFile if libFile.isDirectory =>
-          libFile.listFiles.filter(isJarFile).foreach(jarSet.add)
-        case _ =>
-      }
-    logInfo(s"start shaded fat-jar: ${jarLibs.mkString(",")}")
+    jarLibs.foreach {
+      x =>
+        new File(x) match {
+          case jarFile if jarFile.exists() =>
+            if (jarFile.isFile) {
+              Try(Utils.checkJarFile(jarFile.toURI.toURL)) match {
+                case Success(_) => jarSet.add(jarFile)
+                case Failure(e) => logWarn(s"buildFatJar: error, 
${e.getMessage}")
+              }
+            } else {
+              jarFile.listFiles.foreach(
+                jar => {
+                  if (jar.isFile) {
+                    Try(Utils.checkJarFile(jar.toURI.toURL)) match {
+                      case Success(_) => jarSet.add(jar)
+                      case Failure(e) =>
+                        logWarn(
+                          s"buildFatJar: directory 
[${jarFile.getAbsolutePath}], error: ${e.getMessage}")
+                    }
+                  }
+                })
+            }
+          case _ =>
+        }
+    }
+
+    logInfo(s"start shaded fat-jar: ${jarSet.mkString(",")}")
+
     // shade jars
     val shadeRequest = {
       val req = new ShadeRequest
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
index 41e6e4f46..62f90935a 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
@@ -107,7 +107,6 @@ case class FlinkRemotePerJobBuildRequest(
     workspace: String,
     mainClass: String,
     customFlinkUserJar: String,
-    skipBuild: Boolean,
     executionMode: ExecutionMode,
     developmentMode: DevelopmentMode,
     flinkVersion: FlinkVersion,
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkRemoteBuildPipeline.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkRemoteBuildPipeline.scala
index d6889caa2..86c523fdd 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkRemoteBuildPipeline.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkRemoteBuildPipeline.scala
@@ -31,28 +31,21 @@ class FlinkRemoteBuildPipeline(request: 
FlinkRemotePerJobBuildRequest) extends B
   /** The construction logic needs to be implemented by subclasses */
   @throws[Throwable]
   override protected def buildProcess(): ShadedBuildResponse = {
-    // create workspace.
-    // the sub workspace path like: APP_WORKSPACE/jobName
-    if (request.skipBuild) {
-      ShadedBuildResponse(request.workspace, request.customFlinkUserJar)
-    } else {
-      execStep(1) {
-        LfsOperator.mkCleanDirs(request.workspace)
-        logInfo(s"recreate building workspace: ${request.workspace}")
+    execStep(1) {
+      LfsOperator.mkCleanDirs(request.workspace)
+      logInfo(s"recreate building workspace: ${request.workspace}")
+    }.getOrElse(throw getError.exception)
+    // build flink job shaded jar
+    val shadedJar =
+      execStep(2) {
+        val output = MavenTool.buildFatJar(
+          request.mainClass,
+          request.providedLibs,
+          request.getShadedJarPath(request.workspace))
+        logInfo(s"output shaded flink job jar: ${output.getAbsolutePath}")
+        output
       }.getOrElse(throw getError.exception)
-      // build flink job shaded jar
-      val shadedJar =
-        execStep(2) {
-          val output = MavenTool.buildFatJar(
-            request.mainClass,
-            request.providedLibs,
-            request.getShadedJarPath(request.workspace))
-          logInfo(s"output shaded flink job jar: ${output.getAbsolutePath}")
-          output
-        }.getOrElse(throw getError.exception)
-      ShadedBuildResponse(request.workspace, shadedJar.getAbsolutePath)
-    }
-
+    ShadedBuildResponse(request.workspace, shadedJar.getAbsolutePath)
   }
 
 }

Reply via email to