This is an automated email from the ASF dual-hosted git repository.

kriszu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 279e2d696 [Improve] resource improvement (#2860)
279e2d696 is described below

commit 279e2d696fe96f0bc9cf709cbd8720e0fea78923
Author: benjobs <[email protected]>
AuthorDate: Sun Aug 6 13:14:58 2023 +0800

    [Improve] resource improvement (#2860)
---
 streampark-common/pom.xml                          |   2 +
 .../streampark/common/conf/ConfigConst.scala       |  12 +-
 .../streampark/common/util/ClassLoaderUtils.scala  |  15 +-
 .../streampark-console-service/pom.xml             |   9 +-
 .../main/assembly/script/schema/mysql-schema.sql   |  23 ++
 .../main/assembly/script/upgrade/mysql/2.2.0.sql   |  16 +-
 .../console/base/domain/RestRequest.java           |   2 +-
 .../console/base/domain/RestResponse.java          |   2 +-
 .../console/base/domain/router/RouterMeta.java     |   2 +-
 .../console/base/domain/router/VueRouter.java      |   2 +-
 .../streampark/console/base/util/CommonUtils.java  |   6 +-
 .../streampark/console/base/util/JacksonUtils.java |   2 -
 .../streampark/console/base/util/ObjectUtils.java  |  12 +-
 .../streampark/console/core/bean/Dependency.java   |  22 +-
 .../bean/FlinkConnectorResource.java}              |  23 +-
 .../apache/streampark/console/core/bean/Pom.java   |  11 +-
 .../core/controller/ApplicationController.java     |   5 +-
 .../core/controller/ResourceController.java        |  17 +-
 .../console/core/entity/Application.java           |  85 +----
 .../streampark/console/core/entity/FlinkSql.java   |   8 +-
 .../streampark/console/core/entity/Resource.java   |  15 +-
 .../streampark/console/core/entity/Variable.java   |   2 +-
 .../console/core/service/ApplicationService.java   |   3 -
 .../console/core/service/ResourceService.java      |   9 +-
 .../core/service/impl/AppBuildPipeServiceImpl.java |   5 +-
 .../core/service/impl/ApplicationServiceImpl.java  |  89 ++---
 .../core/service/impl/ResourceServiceImpl.java     | 376 ++++++++++++++++++---
 .../console/system/authentication/JWTToken.java    |   2 +-
 .../console/system/entity/AccessToken.java         |   2 +-
 .../streampark/console/system/entity/Member.java   |   2 +-
 .../streampark/console/system/entity/Menu.java     |   2 +-
 .../streampark/console/system/entity/Role.java     |   2 +-
 .../streampark/console/system/entity/RoleMenu.java |   2 +-
 .../streampark/console/system/entity/SysLog.java   |   2 +-
 .../streampark/console/system/entity/Team.java     |   2 +-
 .../streampark/console/system/entity/User.java     |   2 +-
 .../src/main/resources/db/schema-h2.sql            |   3 +
 .../main/resources/mapper/core/ResourceMapper.xml  |   3 +
 .../console/base/util/DependencyUtilsTest.java     | 151 +++++++++
 .../core/service/ApplicationServiceTest.java       |  29 --
 .../console/core/service/ResourceServiceTest.java  |  61 ++++
 .../src/api/flink/resource/index.ts                |  18 +
 .../src/api/flink/resource/model/resourceModel.ts  |   1 +
 .../src/locales/lang/en/flink/resource.ts          |  13 +-
 .../src/locales/lang/zh-CN/flink/resource.ts       |  13 +-
 .../src/views/flink/app/styles/Add.less            |  21 +-
 .../src/views/flink/resource/View.vue              |  88 +++--
 .../views/flink/resource/components/Resource.vue   |  21 +-
 .../flink/resource/components/ResourceDrawer.vue   | 124 +++++--
 .../src/views/flink/resource/useResourceRender.tsx |  37 +-
 .../doris/bean/LoadStatusFailedException.java      |   2 +-
 .../streampark/flink/packer/maven/MavenTool.scala  |  18 +-
 .../impl/FlinkYarnApplicationBuildPipeline.scala   |   4 +-
 .../streampark/flink/packer/MavenToolSpec.scala    |   1 +
 54 files changed, 1012 insertions(+), 389 deletions(-)

diff --git a/streampark-common/pom.xml b/streampark-common/pom.xml
index 64694d942..32babf65c 100644
--- a/streampark-common/pom.xml
+++ b/streampark-common/pom.xml
@@ -115,6 +115,7 @@
             <optional>true</optional>
         </dependency>
 
+
         <!--logback -->
         <dependency>
             <groupId>org.apache.streampark</groupId>
@@ -128,6 +129,7 @@
             <version>${streampark.shaded.version}</version>
         </dependency>
 
+
         <!-- ZIO -->
         <dependency>
             <groupId>dev.zio</groupId>
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
index 9f39a914c..75c0b4e9c 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
@@ -80,18 +80,16 @@ object ConfigConst {
   val KEY_SPARK_BATCH_DURATION = "spark.batch.duration"
 
   // flink
-  def KEY_APP_CONF(prefix: String = null): String = if (prefix == null) "conf" 
else s"${prefix}conf"
+  def KEY_APP_CONF(prefix: String = null): String = 
s"${Option(prefix).getOrElse("")}conf"
 
-  def KEY_FLINK_CONF(prefix: String = null): String =
-    if (prefix == null) "flink.conf" else s"${prefix}flink.conf"
+  def KEY_FLINK_CONF(prefix: String = null): String = 
s"${Option(prefix).getOrElse("")}flink.conf"
 
-  def KEY_APP_NAME(prefix: String = null): String =
-    if (prefix == null) "app.name" else s"${prefix}app.name"
+  def KEY_APP_NAME(prefix: String = null): String = 
s"${Option(prefix).getOrElse("")}app.name"
 
-  def KEY_FLINK_SQL(prefix: String = null): String = if (prefix == null) "sql" 
else s"${prefix}sql"
+  def KEY_FLINK_SQL(prefix: String = null): String = 
s"${Option(prefix).getOrElse("")}sql"
 
   def KEY_FLINK_PARALLELISM(prefix: String = null): String =
-    if (prefix == null) "parallelism.default" else 
s"${prefix}parallelism.default"
+    s"${Option(prefix).getOrElse("")}parallelism.default"
 
   val KEY_FLINK_OPTION_PREFIX = "flink.option."
 
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
index 620f680e7..eb3f3dadc 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
@@ -16,9 +16,11 @@
  */
 package org.apache.streampark.common.util
 
-import java.io.File
+import java.io.{File, IOException}
 import java.net.{URL, URLClassLoader}
-import java.util.function.Supplier
+import java.util.function.{Consumer, Supplier}
+
+import scala.collection.mutable.ArrayBuffer
 
 object ClassLoaderUtils extends Logger {
 
@@ -61,6 +63,15 @@ object ClassLoaderUtils extends Logger {
       Thread.currentThread.setContextClassLoader(originalClassLoader)
     }
   }
+  @throws[IOException]
+  def cloneClassLoader(): ClassLoader = {
+    val urls = originalClassLoader.getResources(".")
+    val buffer = ArrayBuffer[URL]()
+    while (urls.hasMoreElements) {
+      buffer += urls.nextElement()
+    }
+    new URLClassLoader(buffer.toArray[URL], originalClassLoader)
+  }
 
   def loadJar(jarFilePath: String): Unit = {
     val jarFile = new File(jarFilePath)
diff --git a/streampark-console/streampark-console-service/pom.xml 
b/streampark-console/streampark-console-service/pom.xml
index f88f57aa3..4202d7c96 100644
--- a/streampark-console/streampark-console-service/pom.xml
+++ b/streampark-console/streampark-console-service/pom.xml
@@ -380,6 +380,7 @@
             <version>${project.version}</version>
         </dependency>
 
+
         <dependency>
             <groupId>com.fasterxml.jackson.module</groupId>
             
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
@@ -436,11 +437,17 @@
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>dev.zio</groupId>
             <artifactId>zio-logging_${scala.binary.version}</artifactId>
         </dependency>
-
+      
         <dependency>
             <groupId>dev.zio</groupId>
             <artifactId>zio-streams_${scala.binary.version}</artifactId>
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
index 7783e6c66..52b885c7d 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
@@ -531,4 +531,27 @@ create table `t_yarn_queue` (
    unique key `unq_team_id_queue_label` (`team_id`, `queue_label`) using btree
 ) engine = innodb default charset = utf8mb4 collate = utf8mb4_general_ci;
 
+
+
+-- ----------------------------
+-- Table of t_resource
+-- ----------------------------
+drop table if exists `t_resource`;
+create table if not exists `t_resource` (
+`id` bigint not null auto_increment primary key,
+`resource_name` varchar(128) not null comment 'The name of the resource',
+`resource_type` int not null comment '0:app 1:common 2:connector 3:format 
4:udf',
+`resource_path` varchar(255) default null,
+`resource` text,
+`engine_type` int not null comment 'compute engine type, 0:apache flink 
1:apache spark',
+`main_class` varchar(255) default null,
+`description` text default null comment 'More detailed description of 
resource',
+`creator_id` bigint not null comment 'user id of creator',
+`connector_required_options` text default null,
+`connector_optional_options` text default null,
+`team_id` bigint not null comment 'team id',
+`create_time` datetime not null default current_timestamp comment 'create 
time',
+`modify_time` datetime not null default current_timestamp on update 
current_timestamp comment 'modify time'
+);
+
 set foreign_key_checks = 1;
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
index 757ac6cd7..f428f9216 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
@@ -76,14 +76,14 @@ alter table `t_user` modify column `login_type` tinyint 
default 0 comment 'login
 -- ----------------------------
 drop table if exists `t_flink_gateway`;
 create table `t_flink_gateway` (
-                                `id` bigint not null auto_increment,
-                                `gateway_name` varchar(128) collate 
utf8mb4_general_ci not null comment 'The name of the gateway',
-                                `description` text collate utf8mb4_general_ci 
default null comment 'More detailed description of resource',
-                                `gateway_type` int not null comment 'The type 
of the gateway',
-                                `address` varchar(150) default null comment 
'url address of gateway endpoint',
-                                `create_time` datetime not null default 
current_timestamp comment 'create time',
-                                `modify_time` datetime not null default 
current_timestamp on update current_timestamp comment 'modify time',
-                                primary key (`id`) using btree
+`id` bigint not null auto_increment,
+`gateway_name` varchar(128) collate utf8mb4_general_ci not null comment 'The 
name of the gateway',
+`description` text collate utf8mb4_general_ci default null comment 'More 
detailed description of resource',
+`gateway_type` int not null comment 'The type of the gateway',
+`address` varchar(150) default null comment 'url address of gateway endpoint',
+`create_time` datetime not null default current_timestamp comment 'create 
time',
+`modify_time` datetime not null default current_timestamp on update 
current_timestamp comment 'modify time',
+primary key (`id`) using btree
 ) engine=innodb auto_increment=100000 default charset=utf8mb4 
collate=utf8mb4_general_ci;
 
 -- menu level 2
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/RestRequest.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/RestRequest.java
index 42fe5084a..753458404 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/RestRequest.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/RestRequest.java
@@ -27,7 +27,7 @@ import java.io.Serializable;
 @Data
 public class RestRequest implements Serializable {
 
-  private static final long serialVersionUID = -4869594085374385813L;
+  private static final long serialVersionUID = 1L;
 
   @Schema(example = "10", required = true)
   private int pageSize = 10;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/RestResponse.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/RestResponse.java
index f15fdc410..e5a2610e3 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/RestResponse.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/RestResponse.java
@@ -24,7 +24,7 @@ public class RestResponse extends HashMap<String, Object> {
   public static final String STATUS_SUCCESS = "success";
   public static final String STATUS_FAIL = "error";
 
-  private static final long serialVersionUID = -8713837118340960775L;
+  private static final long serialVersionUID = 1L;
 
   public static RestResponse success(Object data) {
     RestResponse resp = new RestResponse();
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/router/RouterMeta.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/router/RouterMeta.java
index 5753b6f64..1939e99db 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/router/RouterMeta.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/router/RouterMeta.java
@@ -29,7 +29,7 @@ import java.io.Serializable;
 @JsonInclude(JsonInclude.Include.NON_NULL)
 public class RouterMeta implements Serializable {
 
-  private static final long serialVersionUID = 5499925008927195914L;
+  private static final long serialVersionUID = 1L;
 
   private Boolean closeable;
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/router/VueRouter.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/router/VueRouter.java
index 45688413c..da3dffd04 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/router/VueRouter.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/router/VueRouter.java
@@ -30,7 +30,7 @@ import java.util.List;
 @JsonInclude(JsonInclude.Include.NON_NULL)
 public class VueRouter<T> implements Serializable {
 
-  private static final long serialVersionUID = -3327478146308500708L;
+  private static final long serialVersionUID = 1L;
 
   @JsonIgnore private String id;
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java
index 5b80ba623..04b90c621 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java
@@ -47,7 +47,7 @@ public final class CommonUtils implements Serializable {
 
   private CommonUtils() {}
 
-  private static final long serialVersionUID = 6458428317155311192L;
+  private static final long serialVersionUID = 1L;
 
   private static final String OS = System.getProperty("os.name").toLowerCase();
 
@@ -199,7 +199,7 @@ public final class CommonUtils implements Serializable {
     if (iterator != null) {
       while (iterator.hasNext()) {
         Object candidate = iterator.next();
-        if (ObjectUtils.safeEquals(candidate, element)) {
+        if (ObjectUtils.equals(candidate, element)) {
           return true;
         }
       }
@@ -218,7 +218,7 @@ public final class CommonUtils implements Serializable {
     if (enumeration != null) {
       while (enumeration.hasMoreElements()) {
         Object candidate = enumeration.nextElement();
-        if (ObjectUtils.safeEquals(candidate, element)) {
+        if (ObjectUtils.equals(candidate, element)) {
           return true;
         }
       }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java
index 2d99c7bbf..8fe562759 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java
@@ -19,7 +19,6 @@ package org.apache.streampark.console.base.util;
 
 import org.apache.streampark.common.util.DateUtils;
 
-import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.DeserializationFeature;
@@ -41,7 +40,6 @@ public final class JacksonUtils {
     MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
     MAPPER.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
     MAPPER.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
-    MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
     MAPPER.setDateFormat(new SimpleDateFormat(DateUtils.fullFormat()));
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ObjectUtils.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ObjectUtils.java
index f375dad52..c16d2a42f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ObjectUtils.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ObjectUtils.java
@@ -104,7 +104,7 @@ public final class ObjectUtils {
       return false;
     }
     for (Object arrayEle : array) {
-      if (safeEquals(arrayEle, element)) {
+      if (equals(arrayEle, element)) {
         return true;
       }
     }
@@ -238,7 +238,7 @@ public final class ObjectUtils {
    * @return whether the given objects are equal
    * @see Arrays#equals
    */
-  public static boolean safeEquals(Object o1, Object o2) {
+  public static boolean equals(Object o1, Object o2) {
     if (o1 == null || o2 == null) {
       return false;
     }
@@ -282,8 +282,8 @@ public final class ObjectUtils {
     return false;
   }
 
-  public static boolean safeTrimEquals(Object o1, Object o2) {
-    boolean equals = safeEquals(o1, o2);
+  public static boolean trimEquals(Object o1, Object o2) {
+    boolean equals = equals(o1, o2);
     if (!equals) {
       if (o1 != null && o2 != null) {
         if (o1 instanceof String && o2 instanceof String) {
@@ -294,6 +294,10 @@ public final class ObjectUtils {
     return equals;
   }
 
+  public static boolean trimNoEquals(Object o1, Object o2) {
+    return !trimEquals(o1, o2);
+  }
+
   /**
    * Return as hash code for the given object; typically the value of <code>
    * {@link Object#hashCode()}</code>. If the object is an array, this method 
will delegate to any
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Dependency.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Dependency.java
index 6e15e62f5..413a2299a 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Dependency.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Dependency.java
@@ -52,7 +52,7 @@ public class Dependency {
     return pom.isEmpty() && jar.isEmpty();
   }
 
-  public boolean eq(Dependency other) {
+  public boolean equals(Dependency other) {
     if (other == null) {
       return false;
     }
@@ -76,20 +76,20 @@ public class Dependency {
   }
 
   public DependencyInfo toJarPackDeps() {
-    List<Artifact> mvnArts =
-        this.pom.stream()
-            .map(
-                pom ->
-                    new Artifact(
-                        pom.getGroupId(),
-                        pom.getArtifactId(),
-                        pom.getVersion(),
-                        pom.getClassifier()))
-            .collect(Collectors.toList());
+    List<Artifact> mvnArts = toArtifact();
     List<String> extJars =
         this.jar.stream()
             .map(jar -> Workspace.local().APP_UPLOADS() + "/" + jar)
             .collect(Collectors.toList());
     return new DependencyInfo(mvnArts, extJars);
   }
+
+  public List<Artifact> toArtifact() {
+    return this.pom.stream()
+        .map(
+            pom ->
+                new Artifact(
+                    pom.getGroupId(), pom.getArtifactId(), pom.getVersion(), 
pom.getClassifier()))
+        .collect(Collectors.toList());
+  }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/RoleMenu.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkConnectorResource.java
similarity index 63%
copy from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/RoleMenu.java
copy to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkConnectorResource.java
index e2f28c6b2..b6dc9db68 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/RoleMenu.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkConnectorResource.java
@@ -15,25 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.console.system.entity;
+package org.apache.streampark.console.core.bean;
 
-import com.baomidou.mybatisplus.annotation.IdType;
-import com.baomidou.mybatisplus.annotation.TableId;
-import com.baomidou.mybatisplus.annotation.TableName;
 import lombok.Data;
 
-import java.io.Serializable;
+import java.util.Map;
 
-@TableName("t_role_menu")
 @Data
-public class RoleMenu implements Serializable {
-
-  private static final long serialVersionUID = -7573904024872252113L;
-
-  @TableId(type = IdType.AUTO)
-  private Long id;
-
-  private Long roleId;
-
-  private Long menuId;
+public class FlinkConnectorResource {
+  private String className;
+  private String factoryIdentifier;
+  Map<String, String> requiredOptions;
+  Map<String, String> optionalOptions;
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Pom.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Pom.java
index da10a529d..479195915 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Pom.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Pom.java
@@ -48,10 +48,11 @@ public class Pom {
 
   @Override
   public String toString() {
-    return groupId + ":" + artifactId + ":" + version + getClassifier(":");
-  }
-
-  private String getClassifier(String joiner) {
-    return StringUtils.isEmpty(classifier) ? "" : joiner + classifier;
+    return String.format(
+        "%s:%s:%s%s",
+        groupId,
+        artifactId,
+        version,
+        StringUtils.isEmpty(classifier) ? "" : ":".concat(classifier));
   }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
index 12486ef8a..0f9825f3f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
@@ -36,6 +36,7 @@ import 
org.apache.streampark.console.core.service.AppBuildPipeService;
 import org.apache.streampark.console.core.service.ApplicationBackUpService;
 import org.apache.streampark.console.core.service.ApplicationLogService;
 import org.apache.streampark.console.core.service.ApplicationService;
+import org.apache.streampark.console.core.service.ResourceService;
 import org.apache.streampark.flink.packer.pipeline.PipelineStatus;
 
 import org.apache.shiro.authz.annotation.RequiresPermissions;
@@ -80,6 +81,8 @@ public class ApplicationController {
 
   @Autowired private AppBuildPipeService appBuildPipeService;
 
+  @Autowired private ResourceService resourceService;
+
   @Operation(summary = "Get application")
   @ApiAccess
   @PostMapping("get")
@@ -397,7 +400,7 @@ public class ApplicationController {
   @PostMapping("upload")
   @RequiresPermissions("app:create")
   public RestResponse upload(MultipartFile file) throws Exception {
-    String uploadPath = applicationService.upload(file);
+    String uploadPath = resourceService.upload(file);
     return RestResponse.success(uploadPath);
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ResourceController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ResourceController.java
index 27c95b555..a4608d134 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ResourceController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ResourceController.java
@@ -36,6 +36,7 @@ import org.springframework.web.bind.annotation.PutMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.multipart.MultipartFile;
 
 import javax.validation.Valid;
 
@@ -53,11 +54,17 @@ public class ResourceController {
   @Operation(summary = "add resource")
   @PostMapping("add")
   @RequiresPermissions("resource:add")
-  public RestResponse addResource(@Valid Resource resource) {
+  public RestResponse addResource(@Valid Resource resource) throws Exception {
     this.resourceService.addResource(resource);
     return RestResponse.success();
   }
 
+  @Operation(summary = "check resource")
+  @PostMapping("check")
+  public RestResponse checkResource(@Valid Resource resource) throws Exception 
{
+    return this.resourceService.checkResource(resource);
+  }
+
   @Operation(summary = "List resources")
   @PostMapping("page")
   public RestResponse page(RestRequest restRequest, Resource resource) {
@@ -87,4 +94,12 @@ public class ResourceController {
     List<Resource> resourceList = resourceService.findByTeamId(teamId);
     return RestResponse.success(resourceList);
   }
+
+  @Operation(summary = "Upload the resource jar")
+  @PostMapping("upload")
+  @RequiresPermissions("resource:add")
+  public RestResponse upload(MultipartFile file) throws Exception {
+    String uploadPath = resourceService.upload(file);
+    return RestResponse.success(uploadPath);
+  }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 4fc0cf147..7bbeffccb 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -27,7 +27,6 @@ import 
org.apache.streampark.common.enums.FlinkK8sRestExposedType;
 import org.apache.streampark.common.enums.StorageType;
 import org.apache.streampark.common.fs.FsOperator;
 import org.apache.streampark.console.base.util.JacksonUtils;
-import org.apache.streampark.console.base.util.ObjectUtils;
 import org.apache.streampark.console.core.bean.AppControl;
 import org.apache.streampark.console.core.bean.Dependency;
 import org.apache.streampark.console.core.enums.FlinkAppState;
@@ -252,14 +251,6 @@ public class Application implements Serializable {
     return ingressTemplate;
   }
 
-  public void setIngressTemplate(String ingressTemplate) {
-    this.ingressTemplate = ingressTemplate;
-  }
-
-  public String getDefaultModeIngress() {
-    return defaultModeIngress;
-  }
-
   public void setDefaultModeIngress(String defaultModeIngress) {
     this.defaultModeIngress = defaultModeIngress;
   }
@@ -357,11 +348,6 @@ public class Application implements Serializable {
     return DevelopmentMode.of(jobType);
   }
 
-  @JsonIgnore
-  public void setDevelopmentMode(DevelopmentMode mode) {
-    this.jobType = mode.getValue();
-  }
-
   @JsonIgnore
   public FlinkAppState getFlinkAppStateEnum() {
     return FlinkAppState.of(state);
@@ -386,7 +372,7 @@ public class Application implements Serializable {
   public boolean eqFlinkJob(Application other) {
     if (this.isFlinkSqlJob() && other.isFlinkSqlJob()) {
       if (this.getFlinkSql().trim().equals(other.getFlinkSql().trim())) {
-        return this.getDependencyObject().eq(other.getDependencyObject());
+        return this.getDependencyObject().equals(other.getDependencyObject());
       }
     }
     return false;
@@ -509,75 +495,6 @@ public class Application implements Serializable {
     return false;
   }
 
-  /**
-   * Parameter comparison, mainly to compare whether the parameters related to 
Flink runtime have
-   * changed
-   */
-  public boolean eqJobParam(Application other) {
-    // 1) Resolve Order has it changed
-    // 2) flink Version has it changed
-    // 3) Execution Mode has it changed
-    // 4) Parallelism has it changed
-    // 5) Task Slots has it changed
-    // 6) Options has it changed
-    // 7) properties has it changed
-    // 8) Program Args has it changed
-    // 9) Flink Version  has it changed
-
-    if (!ObjectUtils.safeEquals(this.getVersionId(), other.getVersionId())) {
-      return false;
-    }
-
-    if (!ObjectUtils.safeEquals(this.getResolveOrder(), 
other.getResolveOrder())
-        || !ObjectUtils.safeEquals(this.getExecutionMode(), 
other.getExecutionMode())
-        || !ObjectUtils.safeEquals(this.getK8sRestExposedType(), 
other.getK8sRestExposedType())) {
-      return false;
-    }
-
-    if (this.getOptions() != null) {
-      if (other.getOptions() != null) {
-        if (!this.getOptions().trim().equals(other.getOptions().trim())) {
-          Map<String, Object> optMap = this.getOptionMap();
-          Map<String, Object> otherMap = other.getOptionMap();
-          if (optMap.size() != otherMap.size()) {
-            return false;
-          }
-          for (Map.Entry<String, Object> entry : optMap.entrySet()) {
-            if (!entry.getValue().equals(otherMap.get(entry.getKey()))) {
-              return false;
-            }
-          }
-        }
-      } else {
-        return false;
-      }
-    } else if (other.getOptions() != null) {
-      return false;
-    }
-
-    if (this.getDynamicProperties() != null) {
-      if (other.getDynamicProperties() != null) {
-        if 
(!this.getDynamicProperties().trim().equals(other.getDynamicProperties().trim()))
 {
-          return false;
-        }
-      } else {
-        return false;
-      }
-    } else if (other.getDynamicProperties() != null) {
-      return false;
-    }
-
-    if (this.getArgs() != null) {
-      if (other.getArgs() != null) {
-        return this.getArgs().trim().equals(other.getArgs().trim());
-      } else {
-        return false;
-      }
-    } else {
-      return other.getArgs() == null;
-    }
-  }
-
   @JsonIgnore
   public StorageType getStorageType() {
     return getStorageType(getExecutionMode());
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
index cb3fc4f97..60fc07dbd 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
@@ -18,7 +18,6 @@
 package org.apache.streampark.console.core.entity;
 
 import org.apache.streampark.common.util.DeflaterUtils;
-import org.apache.streampark.console.base.util.ObjectUtils;
 import org.apache.streampark.console.core.bean.Dependency;
 import org.apache.streampark.console.core.enums.ChangedType;
 
@@ -30,6 +29,7 @@ import lombok.Data;
 
 import java.util.Base64;
 import java.util.Date;
+import java.util.Objects;
 
 @Data
 @TableName("t_flink_sql")
@@ -89,10 +89,10 @@ public class FlinkSql {
     // 2) determine if dependency has changed
     Dependency thisDependency = Dependency.toDependency(this.getDependency());
     Dependency targetDependency = 
Dependency.toDependency(target.getDependency());
-    boolean depDifference = !thisDependency.eq(targetDependency);
+    boolean depDifference = !thisDependency.equals(targetDependency);
     // 3) determine if team resource has changed
-    boolean teamResDifference =
-        !ObjectUtils.safeEquals(this.teamResource, target.getTeamResource());
+    boolean teamResDifference = !Objects.equals(this.teamResource, 
target.getTeamResource());
+
     if (sqlDifference && depDifference && teamResDifference) {
       return ChangedType.ALL;
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Resource.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Resource.java
index ac5eea6a5..22a1fb3e0 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Resource.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Resource.java
@@ -35,13 +35,17 @@ import java.util.Date;
 @TableName("t_resource")
 public class Resource implements Serializable {
 
-  private static final long serialVersionUID = -7720746591258904369L;
+  private static final long serialVersionUID = 1L;
 
   @TableId(type = IdType.AUTO)
   private Long id;
 
+  // resourceName unique
   private String resourceName;
 
+  // resource path
+  private String resourcePath;
+
   private String resource;
 
   @Size(max = 100, message = "{noMoreThan}")
@@ -54,8 +58,15 @@ public class Resource implements Serializable {
 
   private EngineType engineType;
 
+  // for flink app
   private String mainClass;
 
+  // for flink connector
+  private String connectorRequiredOptions;
+
+  // for flink connector
+  private String connectorOptionalOptions;
+
   /** user name of creator */
   private transient String creatorName;
 
@@ -69,4 +80,6 @@ public class Resource implements Serializable {
   private transient String sortField;
 
   private transient String sortOrder;
+
+  private transient String connector;
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java
index 75571f53a..458c74d3d 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java
@@ -35,7 +35,7 @@ import java.util.Date;
 @TableName("t_variable")
 public class Variable implements Serializable {
 
-  private static final long serialVersionUID = -7720746591258904369L;
+  private static final long serialVersionUID = 1L;
 
   @TableId(type = IdType.AUTO)
   private Long id;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
index cba588c65..8e51c155f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
@@ -25,7 +25,6 @@ import 
org.apache.streampark.console.core.enums.AppExistsState;
 
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.service.IService;
-import org.springframework.web.multipart.MultipartFile;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -75,8 +74,6 @@ public interface ApplicationService extends 
IService<Application> {
 
   Map<String, Serializable> dashboard(Long teamId);
 
-  String upload(MultipartFile file) throws Exception;
-
   /** set the latest to Effective, it will really become the current effective 
*/
   void toEffective(Application application);
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java
index d59df4a59..4a4781911 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java
@@ -18,11 +18,14 @@
 package org.apache.streampark.console.core.service;
 
 import org.apache.streampark.console.base.domain.RestRequest;
+import org.apache.streampark.console.base.domain.RestResponse;
 import org.apache.streampark.console.core.entity.Resource;
 
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.service.IService;
+import org.springframework.web.multipart.MultipartFile;
 
+import java.io.IOException;
 import java.util.List;
 
 public interface ResourceService extends IService<Resource> {
@@ -49,7 +52,7 @@ public interface ResourceService extends IService<Resource> {
    *
    * @param resource resource
    */
-  void addResource(Resource resource);
+  void addResource(Resource resource) throws Exception;
 
   /**
    * @param teamId team id
@@ -87,4 +90,8 @@ public interface ResourceService extends IService<Resource> {
    * @param targetUserId target user id
    */
   void changeOwnership(Long userId, Long targetUserId);
+
+  String upload(MultipartFile file) throws IOException;
+
+  RestResponse checkResource(Resource resource) throws Exception;
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 2a38308cb..b7c37eee6 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -108,6 +108,8 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static org.apache.streampark.console.core.enums.Operation.RELEASE;
+
 @Service
 @Slf4j
 @Transactional(propagation = Propagation.SUPPORTS, rollbackFor = 
Exception.class)
@@ -170,8 +172,7 @@ public class AppBuildPipeServiceImpl
 
     Application app = applicationService.getById(appId);
     ApplicationLog applicationLog = new ApplicationLog();
-    applicationLog.setOptionName(
-        org.apache.streampark.console.core.enums.Operation.RELEASE.getValue());
+    applicationLog.setOptionName(RELEASE.getValue());
     applicationLog.setAppId(app.getId());
     applicationLog.setOptionTime(new Date());
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 7c9140dc1..ee2d996d5 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -96,7 +96,6 @@ import 
org.apache.streampark.flink.packer.pipeline.BuildResult;
 import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.CoreOptions;
@@ -120,7 +119,6 @@ import 
org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
-import org.springframework.web.multipart.MultipartFile;
 
 import javax.annotation.Nonnull;
 import javax.annotation.PostConstruct;
@@ -307,24 +305,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     return map;
   }
 
-  @Override
-  public String upload(MultipartFile file) throws Exception {
-    File temp = WebUtils.getAppTempDir();
-    String fileName = 
FilenameUtils.getName(Objects.requireNonNull(file.getOriginalFilename()));
-    File saveFile = new File(temp, fileName);
-    // delete when exists
-    if (saveFile.exists()) {
-      saveFile.delete();
-    }
-    // save file to temp dir
-    try {
-      file.transferTo(saveFile);
-    } catch (Exception e) {
-      throw new ApiDetailException(e);
-    }
-    return saveFile.getAbsolutePath();
-  }
-
   @Override
   public void toEffective(Application application) {
     // set latest to Effective
@@ -861,57 +841,60 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
         String.format(ERROR_APP_QUEUE_HINT, appParam.getYarnQueue(), 
appParam.getTeamId()));
 
     application.setRelease(ReleaseState.NEED_RELEASE.get());
+
+    // 1) jar job jar file changed
     if (application.isUploadJob()) {
-      if (!ObjectUtils.safeEquals(application.getJar(), appParam.getJar())) {
+      if (!Objects.equals(application.getJar(), appParam.getJar())) {
         application.setBuild(true);
       } else {
         File jarFile = new File(WebUtils.getAppTempDir(), appParam.getJar());
         if (jarFile.exists()) {
-          long checkSum = 0;
           try {
-            checkSum = FileUtils.checksumCRC32(jarFile);
+            long checkSum = FileUtils.checksumCRC32(jarFile);
+            if (!Objects.equals(checkSum, application.getJarCheckSum())) {
+              application.setBuild(true);
+            }
           } catch (IOException e) {
             log.error("Error in checksumCRC32 for {}.", jarFile);
             throw new RuntimeException(e);
           }
-          if (!ObjectUtils.safeEquals(checkSum, application.getJarCheckSum())) 
{
-            application.setBuild(true);
-          }
-        }
-      }
-    }
-
-    if (!application.getBuild()) {
-      if (!application.getExecutionMode().equals(appParam.getExecutionMode())) 
{
-        if 
(appParam.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)
-            || 
application.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)) {
-          application.setBuild(true);
         }
       }
     }
 
-    if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode())) {
-      if (!ObjectUtils.safeTrimEquals(
+    // 2) k8s podTemplate changed..
+    if (application.getBuild() && 
ExecutionMode.isKubernetesMode(appParam.getExecutionMode())) {
+      if (ObjectUtils.trimNoEquals(
               application.getK8sRestExposedType(), 
appParam.getK8sRestExposedType())
-          || !ObjectUtils.safeTrimEquals(
+          || ObjectUtils.trimNoEquals(
               application.getK8sJmPodTemplate(), 
appParam.getK8sJmPodTemplate())
-          || !ObjectUtils.safeTrimEquals(
+          || ObjectUtils.trimNoEquals(
               application.getK8sTmPodTemplate(), 
appParam.getK8sTmPodTemplate())
-          || !ObjectUtils.safeTrimEquals(
+          || ObjectUtils.trimNoEquals(
               application.getK8sPodTemplates(), appParam.getK8sPodTemplates())
-          || !ObjectUtils.safeTrimEquals(
+          || ObjectUtils.trimNoEquals(
               application.getK8sHadoopIntegration(), 
appParam.getK8sHadoopIntegration())
-          || !ObjectUtils.safeTrimEquals(application.getFlinkImage(), 
appParam.getFlinkImage())) {
+          || ObjectUtils.trimNoEquals(application.getFlinkImage(), 
appParam.getFlinkImage())) {
         application.setBuild(true);
       }
     }
 
-    // when flink version has changed, we should rebuild the application. 
Otherwise, the shims jar
-    // may be not suitable for the new flink version.
-    if (!ObjectUtils.safeEquals(application.getVersionId(), 
appParam.getVersionId())) {
+    // 3) flink version changed
+    if (!application.getBuild()
+        && !Objects.equals(application.getVersionId(), 
appParam.getVersionId())) {
       application.setBuild(true);
     }
 
+    // 4) yarn application mode change
+    if (!application.getBuild()) {
+      if (!application.getExecutionMode().equals(appParam.getExecutionMode())) 
{
+        if 
(appParam.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)
+            || 
application.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)) {
+          application.setBuild(true);
+        }
+      }
+    }
+
     appParam.setJobType(application.getJobType());
     // changes to the following parameters need to be re-release to take effect
     application.setJobName(appParam.getJobName());
@@ -959,15 +942,16 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     // Flink Sql job...
     if (application.isFlinkSqlJob()) {
       updateFlinkSqlJob(application, appParam);
+      return true;
+    }
+
+    if (application.isStreamParkJob()) {
+      configService.update(appParam, application.isRunning());
     } else {
-      if (application.isStreamParkJob()) {
-        configService.update(appParam, application.isRunning());
-      } else {
-        application.setJar(appParam.getJar());
-        application.setMainClass(appParam.getMainClass());
-      }
+      application.setJar(appParam.getJar());
+      application.setMainClass(appParam.getMainClass());
     }
-    baseMapper.updateById(application);
+    this.updateById(application);
     return true;
   }
 
@@ -1039,6 +1023,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
         }
       }
     }
+    this.updateById(application);
     this.configService.update(appParam, application.isRunning());
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
index f702ad004..de3ec905c 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
@@ -19,11 +19,16 @@ package org.apache.streampark.console.core.service.impl;
 
 import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.fs.FsOperator;
+import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.domain.RestRequest;
+import org.apache.streampark.console.base.domain.RestResponse;
 import org.apache.streampark.console.base.exception.ApiAlertException;
+import org.apache.streampark.console.base.exception.ApiDetailException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
+import org.apache.streampark.console.base.util.JacksonUtils;
 import org.apache.streampark.console.base.util.WebUtils;
 import org.apache.streampark.console.core.bean.Dependency;
+import org.apache.streampark.console.core.bean.FlinkConnectorResource;
 import org.apache.streampark.console.core.bean.Pom;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.FlinkSql;
@@ -34,25 +39,47 @@ import 
org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.CommonService;
 import org.apache.streampark.console.core.service.FlinkSqlService;
 import org.apache.streampark.console.core.service.ResourceService;
+import org.apache.streampark.flink.packer.maven.Artifact;
+import org.apache.streampark.flink.packer.maven.MavenTool;
 
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.factories.Factory;
+import org.apache.hadoop.shaded.org.apache.commons.codec.digest.DigestUtils;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
+import org.springframework.web.multipart.MultipartFile;
 
 import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Scanner;
+import java.util.ServiceLoader;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
 import java.util.stream.Collectors;
 
 @Slf4j
@@ -65,6 +92,8 @@ public class ResourceServiceImpl extends 
ServiceImpl<ResourceMapper, Resource>
   @Autowired private CommonService commonService;
   @Autowired private FlinkSqlService flinkSqlService;
 
+  public ResourceServiceImpl() {}
+
   @Override
   public IPage<Resource> page(Resource resource, RestRequest restRequest) {
     if (resource.getTeamId() == null) {
@@ -86,50 +115,53 @@ public class ResourceServiceImpl extends 
ServiceImpl<ResourceMapper, Resource>
   }
 
   @Override
-  public void addResource(Resource resource) {
+  public void addResource(Resource resource) throws Exception {
     String resourceStr = resource.getResource();
     ApiAlertException.throwIfNull(resourceStr, "Please add pom or jar 
resource.");
 
-    if (resource.getResourceType() == ResourceType.GROUP) {
-      ApiAlertException.throwIfNull(
-          resource.getResourceName(), "The name of resource group is 
required.");
+    // check
+    Dependency dependency = Dependency.toDependency(resourceStr);
+    List<String> jars = dependency.getJar();
+    List<Pom> poms = dependency.getPom();
+
+    ApiAlertException.throwIfTrue(
+        jars.isEmpty() && poms.isEmpty(), "Please add pom or jar resource.");
+
+    ApiAlertException.throwIfTrue(
+        resource.getResourceType() == ResourceType.FLINK_APP && jars.isEmpty(),
+        "Please upload jar for Flink_App resource");
+
+    ApiAlertException.throwIfTrue(
+        jars.size() + poms.size() > 1, "Please do not add multi dependency at 
one time.");
+
+    if (resource.getResourceType() != ResourceType.CONNECTOR) {
+      ApiAlertException.throwIfNull(resource.getResourceName(), "The 
resourceName is required.");
     } else {
-      Dependency dependency = Dependency.toDependency(resourceStr);
-      List<String> jars = dependency.getJar();
-      List<Pom> poms = dependency.getPom();
-
-      ApiAlertException.throwIfTrue(
-          jars.isEmpty() && poms.isEmpty(), "Please add pom or jar resource.");
-      ApiAlertException.throwIfTrue(
-          jars.size() + poms.size() > 1, "Please do not add multi dependency 
at one time.");
-      ApiAlertException.throwIfTrue(
-          resource.getResourceType() == ResourceType.FLINK_APP && 
jars.isEmpty(),
-          "Please upload jar for Flink_App resource");
-
-      Long teamId = resource.getTeamId();
-      String resourceName = null;
-
-      if (poms.isEmpty()) {
-        resourceName = jars.get(0);
-        ApiAlertException.throwIfTrue(
-            this.findByResourceName(teamId, resourceName) != null,
-            String.format("Sorry, the resource %s already exists.", 
resourceName));
-
-        // copy jar to team upload directory
-        transferTeamResource(teamId, resourceName);
-      } else {
-        Pom pom = poms.get(0);
-        resourceName =
-            String.format("%s:%s:%s", pom.getGroupId(), pom.getArtifactId(), 
pom.getVersion());
-        if (StringUtils.isNotBlank(pom.getClassifier())) {
-          resourceName = resourceName + ":" + pom.getClassifier();
-        }
-        ApiAlertException.throwIfTrue(
-            this.findByResourceName(teamId, resourceName) != null,
-            String.format("Sorry, the resource %s already exists.", 
resourceName));
+      String connector = resource.getConnector();
+      ApiAlertException.throwIfTrue(connector == null, "the flink connector is 
null.");
+      FlinkConnectorResource connectorResource =
+          JacksonUtils.read(connector, FlinkConnectorResource.class);
+      resource.setResourceName(connectorResource.getFactoryIdentifier());
+      if (connectorResource.getRequiredOptions() != null) {
+        resource.setConnectorRequiredOptions(
+            JacksonUtils.write(connectorResource.getRequiredOptions()));
       }
+      if (connectorResource.getOptionalOptions() != null) {
+        resource.setConnectorOptionalOptions(
+            JacksonUtils.write(connectorResource.getOptionalOptions()));
+      }
+    }
 
-      resource.setResourceName(resourceName);
+    ApiAlertException.throwIfTrue(
+        this.findByResourceName(resource.getTeamId(), 
resource.getResourceName()) != null,
+        String.format("Sorry, the resource %s already exists.", 
resource.getResourceName()));
+
+    if (!jars.isEmpty()) {
+      String resourcePath = jars.get(0);
+      resource.setResourcePath(resourcePath);
+      // copy jar to team upload directory
+      String upFile = resourcePath.split(":")[1];
+      transferTeamResource(resource.getTeamId(), upFile);
     }
 
     resource.setCreatorId(commonService.getUserId());
@@ -155,7 +187,12 @@ public class ResourceServiceImpl extends 
ServiceImpl<ResourceMapper, Resource>
       ApiAlertException.throwIfFalse(
           resourceName.equals(findResource.getResourceName()),
           "Please make sure the resource name is not changed.");
-      transferTeamResource(findResource.getTeamId(), resourceName);
+
+      Dependency dependency = Dependency.toDependency(resource.getResource());
+      if (!dependency.getJar().isEmpty()) {
+        String jarFile = dependency.getJar().get(0).split(":")[1];
+        transferTeamResource(findResource.getTeamId(), jarFile);
+      }
     }
 
     findResource.setDescription(resource.getDescription());
@@ -199,15 +236,234 @@ public class ResourceServiceImpl extends 
ServiceImpl<ResourceMapper, Resource>
     this.baseMapper.update(null, updateWrapper);
   }
 
-  private void transferTeamResource(Long teamId, String resourceName) {
+  /**
+   * @param file
+   * @return
+   */
+  @Override
+  public String upload(MultipartFile file) throws IOException {
+    File temp = WebUtils.getAppTempDir();
+
+    String name = file.getOriginalFilename();
+    String suffix = name.substring(name.lastIndexOf("."));
+
+    String sha256Hex = DigestUtils.sha256Hex(file.getInputStream());
+    String fileName = sha256Hex.concat(suffix);
+
+    File saveFile = new File(temp, fileName);
+
+    if (!saveFile.exists()) {
+      // save file to temp dir
+      try {
+        file.transferTo(saveFile);
+      } catch (Exception e) {
+        throw new ApiDetailException(e);
+      }
+    }
+
+    return saveFile.getAbsolutePath();
+  }
+
+  @Override
+  public RestResponse checkResource(Resource resourceParam) throws 
JsonProcessingException {
+    ResourceType type = resourceParam.getResourceType();
+    Map<String, Serializable> resp = new HashMap<>(0);
+    resp.put("state", 0);
+    switch (type) {
+      case FLINK_APP:
+        // check main.
+        File jarFile;
+        try {
+          jarFile = getResourceJar(resourceParam);
+        } catch (Exception e) {
+          // get jarFile error
+          resp.put("state", 1);
+          resp.put("exception", Utils.stringifyException(e));
+          return RestResponse.success().data(resp);
+        }
+        Manifest manifest = Utils.getJarManifest(jarFile);
+        String mainClass = manifest.getMainAttributes().getValue("Main-Class");
+
+        if (mainClass == null) {
+          // main class is null
+          resp.put("state", 2);
+          return RestResponse.success().data(resp);
+        }
+        return RestResponse.success().data(resp);
+      case CONNECTOR:
+        // 1) get connector id
+        FlinkConnectorResource connectorResource;
+
+        ApiAlertException.throwIfFalse(
+            ResourceType.CONNECTOR.equals(resourceParam.getResourceType()),
+            "getConnectorId method error, resource not flink connector.");
+
+        List<File> jars;
+        File connector = null;
+        List<String> factories;
+
+        Dependency dependency = 
Dependency.toDependency(resourceParam.getResource());
+
+        // 1) get connector jar
+        if (!dependency.getPom().isEmpty()) {
+          Artifact artifact = dependency.toArtifact().get(0);
+          try {
+            jars = MavenTool.resolveArtifacts(artifact);
+          } catch (Exception e) {
+            // connector download is null
+            resp.put("state", 1);
+            resp.put("exception", Utils.stringifyException(e));
+            return RestResponse.success().data(resp);
+          }
+          String fileName = String.format("%s-%s.jar", artifact.artifactId(), 
artifact.version());
+          Optional<File> file = jars.stream().filter(x -> 
x.getName().equals(fileName)).findFirst();
+          if (file.isPresent()) {
+            connector = file.get();
+          }
+        } else {
+          // 2) jar
+          String jar = dependency.getJar().get(0).split(":")[1];
+          File file = new File(jar);
+          connector = file;
+          jars = Collections.singletonList(file);
+        }
+
+        // 2) parse connector Factory
+        try {
+          factories = getConnectorFactory(connector);
+        } catch (Exception e) {
+          // flink connector invalid
+          resp.put("state", 2);
+          resp.put("exception", Utils.stringifyException(e));
+          return RestResponse.success().data(resp);
+        }
+
+        // 3) get connector resource
+        connectorResource = getConnectorResource(jars, factories);
+        if (connectorResource == null) {
+          // connector is null
+          resp.put("state", 3);
+          return RestResponse.success().data(resp);
+        }
+
+        // 2) check connector exists
+        boolean exists =
+            existsFlinkConnector(resourceParam.getId(), 
connectorResource.getFactoryIdentifier());
+        if (exists) {
+          resp.put("state", 4);
+          resp.put("name", connectorResource.getFactoryIdentifier());
+          return RestResponse.success(resp);
+        }
+
+        if (resourceParam.getId() != null) {
+          Resource resource = getById(resourceParam.getId());
+          if 
(!resource.getResourceName().equals(connectorResource.getFactoryIdentifier())) {
+            resp.put("state", 5);
+            return RestResponse.success().data(resp);
+          }
+        }
+        resp.put("state", 0);
+        resp.put("connector", JacksonUtils.write(connectorResource));
+        return RestResponse.success().data(resp);
+    }
+    return RestResponse.success().data(resp);
+  }
+
+  private boolean existsFlinkConnector(Long id, String connectorId) {
+    LambdaQueryWrapper<Resource> lambdaQueryWrapper =
+        new LambdaQueryWrapper<Resource>().eq(Resource::getResourceName, 
connectorId);
+    if (id != null) {
+      lambdaQueryWrapper.ne(Resource::getId, id);
+    }
+    return getBaseMapper().exists(lambdaQueryWrapper);
+  }
+
+  private FlinkConnectorResource getConnectorResource(List<File> jars, 
List<String> factories) {
+    Class<Factory> className = Factory.class;
+    URL[] array =
+        jars.stream()
+            .map(
+                x -> {
+                  try {
+                    return x.toURI().toURL();
+                  } catch (MalformedURLException e) {
+                    throw new RuntimeException(e);
+                  }
+                })
+            .toArray(URL[]::new);
+
+    try (URLClassLoader urlClassLoader = URLClassLoader.newInstance(array)) {
+      ServiceLoader<Factory> serviceLoader = ServiceLoader.load(className, 
urlClassLoader);
+      for (Factory factory : serviceLoader) {
+        String factoryClassName = factory.getClass().getName();
+        if (factories.contains(factoryClassName)) {
+          FlinkConnectorResource connectorResource = new 
FlinkConnectorResource();
+          try {
+            connectorResource.setClassName(factoryClassName);
+            
connectorResource.setFactoryIdentifier(factory.factoryIdentifier());
+          } catch (Exception ignored) {
+          }
+
+          try {
+            Map<String, String> requiredOptions = new HashMap<>(0);
+            factory
+                .requiredOptions()
+                .forEach(x -> requiredOptions.put(x.key(), 
getOptionDefaultValue(x)));
+            connectorResource.setRequiredOptions(requiredOptions);
+          } catch (Exception ignored) {
+
+          }
+
+          try {
+            Map<String, String> optionalOptions = new HashMap<>(0);
+            factory
+                .optionalOptions()
+                .forEach(x -> optionalOptions.put(x.key(), 
getOptionDefaultValue(x)));
+            connectorResource.setOptionalOptions(optionalOptions);
+          } catch (Exception ignored) {
+          }
+          return connectorResource;
+        }
+      }
+      return null;
+    } catch (Exception e) {
+      log.error("getConnectorResource failed. " + e);
+    }
+    return null;
+  }
+
+  private File getResourceJar(Resource resource) throws Exception {
+    Dependency dependency = Dependency.toDependency(resource.getResource());
+    if (dependency.isEmpty()) {
+      return null;
+    }
+    if (!dependency.getJar().isEmpty()) {
+      String jar = dependency.getJar().get(0).split(":")[1];
+      return new File(jar);
+    } else {
+      Artifact artifact = dependency.toArtifact().get(0);
+      List<File> files = MavenTool.resolveArtifacts(artifact);
+      if (!files.isEmpty()) {
+        String fileName = String.format("%s-%s.jar", artifact.artifactId(), 
artifact.version());
+        Optional<File> jarFile =
+            files.stream().filter(x -> 
x.getName().equals(fileName)).findFirst();
+        if (jarFile.isPresent()) {
+          return jarFile.get();
+        }
+      }
+      return null;
+    }
+  }
+
+  private void transferTeamResource(Long teamId, String resourcePath) {
     String teamUploads = String.format("%s/%d", 
Workspace.local().APP_UPLOADS(), teamId);
     if (!FsOperator.lfs().exists(teamUploads)) {
       FsOperator.lfs().mkdirs(teamUploads);
     }
-    File localJar = new File(WebUtils.getAppTempDir(), resourceName);
-    File teamUploadJar = new File(teamUploads, resourceName);
+    File localJar = new File(resourcePath);
+    File teamUploadJar = new File(teamUploads, localJar.getName());
     ApiAlertException.throwIfFalse(
-        localJar.exists(), "Missing file: " + resourceName + ", please upload 
again");
+        localJar.exists(), "Missing file: " + resourcePath + ", please upload 
again");
     FsOperator.lfs()
         .upload(localJar.getAbsolutePath(), teamUploadJar.getAbsolutePath(), 
false, true);
   }
@@ -246,4 +502,36 @@ public class ResourceServiceImpl extends 
ServiceImpl<ResourceMapper, Resource>
 
     return dependApplications;
   }
+
+  private List<String> getConnectorFactory(File connector) throws Exception {
+    String configFile = 
"META-INF/services/org.apache.flink.table.factories.Factory";
+    JarFile jarFile = new JarFile(connector);
+    JarEntry entry = jarFile.getJarEntry(configFile);
+    if (entry == null) {
+      throw new IllegalArgumentException("invalid flink connector");
+    }
+    List<String> factories = new ArrayList<>(0);
+    try (InputStream inputStream = jarFile.getInputStream(entry)) {
+      Scanner scanner = new Scanner(new InputStreamReader(inputStream));
+      while (scanner.hasNextLine()) {
+        String line = scanner.nextLine().trim();
+        if (line.length() > 0 && !line.startsWith("#")) {
+          factories.add(line);
+        }
+      }
+      scanner.close();
+    }
+    return factories;
+  }
+
+  private String getOptionDefaultValue(ConfigOption<?> option) {
+    if (!option.hasDefaultValue()) {
+      return null;
+    }
+    Object value = option.defaultValue();
+    if (value instanceof Duration) {
+      return value.toString().replace("PT", "").toLowerCase();
+    }
+    return value.toString();
+  }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTToken.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTToken.java
index b035253f2..f04adad00 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTToken.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTToken.java
@@ -25,7 +25,7 @@ import lombok.Data;
 @Data
 public class JWTToken implements AuthenticationToken {
 
-  private static final long serialVersionUID = 1282057025599826155L;
+  private static final long serialVersionUID = 1L;
 
   private String token;
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/AccessToken.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/AccessToken.java
index cc89f609d..901b61153 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/AccessToken.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/AccessToken.java
@@ -32,7 +32,7 @@ import java.util.Date;
 @TableName("t_access_token")
 public class AccessToken implements Serializable {
 
-  private static final long serialVersionUID = 7187628714679791772L;
+  private static final long serialVersionUID = 1L;
   public static final String DEFAULT_EXPIRE_TIME = "9999-01-01 00:00:00";
   public static final String IS_API_TOKEN = "is_api_token";
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Member.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Member.java
index d3e7e029b..6c85a3b96 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Member.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Member.java
@@ -29,7 +29,7 @@ import java.util.Date;
 @Data
 public class Member implements Serializable {
 
-  private static final long serialVersionUID = -3166012934498268403L;
+  private static final long serialVersionUID = 1L;
 
   @TableId(type = IdType.AUTO)
   private Long id;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Menu.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Menu.java
index 61533daa2..17d9e914e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Menu.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Menu.java
@@ -32,7 +32,7 @@ import java.util.Date;
 @TableName("t_menu")
 public class Menu implements Serializable {
 
-  private static final long serialVersionUID = 7187628714679791771L;
+  private static final long serialVersionUID = 1L;
 
   public static final String TYPE_MENU = "0";
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Role.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Role.java
index 16d853b30..ed412d340 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Role.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Role.java
@@ -32,7 +32,7 @@ import java.util.Date;
 @TableName("t_role")
 public class Role implements Serializable {
 
-  private static final long serialVersionUID = -1714476694755654924L;
+  private static final long serialVersionUID = 1L;
 
   @TableId(type = IdType.AUTO)
   private Long roleId;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/RoleMenu.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/RoleMenu.java
index e2f28c6b2..5658e9c2a 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/RoleMenu.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/RoleMenu.java
@@ -28,7 +28,7 @@ import java.io.Serializable;
 @Data
 public class RoleMenu implements Serializable {
 
-  private static final long serialVersionUID = -7573904024872252113L;
+  private static final long serialVersionUID = 1L;
 
   @TableId(type = IdType.AUTO)
   private Long id;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/SysLog.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/SysLog.java
index cdfd51572..cdf029eda 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/SysLog.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/SysLog.java
@@ -29,7 +29,7 @@ import java.util.Date;
 @TableName("t_log")
 public class SysLog implements Serializable {
 
-  private static final long serialVersionUID = -8878596941954995444L;
+  private static final long serialVersionUID = 1L;
 
   @TableId(type = IdType.AUTO)
   private Long id;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Team.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Team.java
index 415fb43e0..8d7bca18e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Team.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Team.java
@@ -31,7 +31,7 @@ import java.util.Date;
 @TableName("t_team")
 public class Team implements Serializable {
 
-  private static final long serialVersionUID = -1714476694755654924L;
+  private static final long serialVersionUID = 1L;
 
   @TableId(type = IdType.AUTO)
   private Long id;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/User.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/User.java
index 4b3fc3375..7a328a1f9 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/User.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/User.java
@@ -37,7 +37,7 @@ import java.util.Date;
 @TableName("t_user")
 public class User implements Serializable {
 
-  private static final long serialVersionUID = -4852732617765810959L;
+  private static final long serialVersionUID = 1L;
   /** user status */
   public static final String STATUS_VALID = "1";
 
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
 
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index fcb05c25e..16b6503f1 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++ 
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -220,11 +220,14 @@ create table if not exists `t_resource` (
   `id` bigint generated by default as identity not null,
   `resource_name` varchar(128) not null comment 'The name of the resource 
file',
   `resource_type` int not null comment '0:app 1:common 2:connector 3:format 
4:udf',
+  `resource_path` varchar(255) default null,
   `resource` text ,
   `engine_type` int not null comment 'compute engine type, 0:apache flink 
1:apache spark',
   `main_class` varchar(255) default null,
   `description` text default null comment 'More detailed description of 
resource',
   `creator_id` bigint not null comment 'user id of creator',
+  `connector_required_options` text default null,
+  `connector_optional_options` text default null,
   `team_id` bigint not null comment 'team id',
   `create_time` datetime not null default current_timestamp comment 'create 
time',
   `modify_time` datetime not null default current_timestamp on update 
current_timestamp comment 'modify time',
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ResourceMapper.xml
 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ResourceMapper.xml
index c8020991a..21228ca97 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ResourceMapper.xml
+++ 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ResourceMapper.xml
@@ -20,11 +20,14 @@
     <resultMap id="ResourceMap" 
type="org.apache.streampark.console.core.entity.Resource">
         <result column="id" jdbcType="BIGINT" property="id"/>
         <result column="resource_name" jdbcType="VARCHAR" 
property="resourceName"/>
+        <result column="connector_path" jdbcType="VARCHAR" 
property="connectorPath"/>
         <result column="description" jdbcType="VARCHAR" 
property="description"/>
         <result column="resource_type" jdbcType="BIGINT" 
property="resourceType"/>
         <result column="resource" jdbcType="VARCHAR" property="resource"/>
         <result column="engine_type" jdbcType="BIGINT" property="engineType"/>
         <result column="creator_id" jdbcType="BIGINT" property="creatorId"/>
+        <result column="connector_required_options" jdbcType="LONGVARCHAR" 
property="connectorRequiredOptions"/>
+        <result column="connector_optional_options" jdbcType="LONGVARCHAR" 
property="connectorOptionalOptions"/>
         <result column="team_id" jdbcType="BIGINT" property="teamId"/>
         <result column="create_time" jdbcType="TIMESTAMP" 
property="createTime"/>
         <result column="modify_time" jdbcType="TIMESTAMP" 
property="modifyTime"/>
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java
new file mode 100644
index 000000000..10dc40ccb
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.base.util;
+
+import org.apache.streampark.common.conf.CommonConfig;
+import org.apache.streampark.common.conf.InternalConfigHolder;
+import org.apache.streampark.console.core.bean.FlinkConnectorResource;
+import org.apache.streampark.flink.packer.maven.Artifact;
+import org.apache.streampark.flink.packer.maven.MavenTool;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.factories.Factory;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Scanner;
+import java.util.ServiceLoader;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
+
+@Slf4j
+class DependencyUtilsTest {
+
+  @Test
+  public void resolveFlinkConnector() throws Exception {
+
+    Artifact artifact = new Artifact("com.ververica", 
"flink-connector-mysql-cdc", "2.4.1", null);
+
+    InternalConfigHolder.set(CommonConfig.STREAMPARK_WORKSPACE_LOCAL(), 
"~/tmp");
+
+    List<File> files = MavenTool.resolveArtifacts(artifact);
+    if (files.isEmpty()) {
+      return;
+    }
+
+    String fileName = String.format("%s-%s.jar", artifact.artifactId(), 
artifact.version());
+    Optional<File> jarFile = files.stream().filter(x -> 
x.getName().equals(fileName)).findFirst();
+    File connector = jarFile.get();
+
+    List<String> factories = getConnectorFactory(connector);
+
+    Class<Factory> className = Factory.class;
+    URL[] array =
+        files.stream()
+            .map(
+                x -> {
+                  try {
+                    return x.toURI().toURL();
+                  } catch (MalformedURLException e) {
+                    throw new RuntimeException(e);
+                  }
+                })
+            .toArray(URL[]::new);
+
+    URLClassLoader urlClassLoader = URLClassLoader.newInstance(array);
+    ServiceLoader<Factory> serviceLoader = ServiceLoader.load(className, 
urlClassLoader);
+
+    List<FlinkConnectorResource> connectorResources = new ArrayList<>();
+    try {
+      for (Factory factory : serviceLoader) {
+        String factoryClassName = factory.getClass().getName();
+        if (factories.contains(factoryClassName)) {
+          FlinkConnectorResource connectorResource = new 
FlinkConnectorResource();
+          connectorResource.setClassName(factoryClassName);
+          connectorResource.setFactoryIdentifier(factory.factoryIdentifier());
+          Map<String, String> requiredOptions = new HashMap<>(0);
+          factory
+              .requiredOptions()
+              .forEach(x -> requiredOptions.put(x.key(), 
getOptionDefaultValue(x)));
+          connectorResource.setRequiredOptions(requiredOptions);
+
+          Map<String, String> optionalOptions = new HashMap<>(0);
+          factory
+              .optionalOptions()
+              .forEach(x -> optionalOptions.put(x.key(), 
getOptionDefaultValue(x)));
+          connectorResource.setOptionalOptions(optionalOptions);
+
+          connectorResources.add(connectorResource);
+        }
+      }
+    } catch (Throwable e) {
+      e.printStackTrace();
+    }
+    urlClassLoader.close();
+    System.out.println(connectorResources);
+  }
+
+  private String getOptionDefaultValue(ConfigOption<?> option) {
+    if (!option.hasDefaultValue()) {
+      return null;
+    }
+    Object value = option.defaultValue();
+    if (value instanceof Duration) {
+      return value.toString().replace("PT", "").toLowerCase();
+    }
+    return value.toString();
+  }
+
+  @Test
+  public void testDuration() {
+    String s = "PT30H";
+    Duration duration = Duration.parse(s);
+    System.out.println(duration.getSeconds());
+  }
+
+  private List<String> getConnectorFactory(File connector) throws Exception {
+    String configFile = 
"META-INF/services/org.apache.flink.table.factories.Factory";
+    JarFile jarFile = new JarFile(connector);
+    JarEntry entry = jarFile.getJarEntry(configFile);
+    List<String> factories = new ArrayList<>(0);
+    try (InputStream inputStream = jarFile.getInputStream(entry)) {
+      Scanner scanner = new Scanner(new InputStreamReader(inputStream));
+      while (scanner.hasNextLine()) {
+        String line = scanner.nextLine().trim();
+        if (line.length() > 0 && !line.startsWith("#")) {
+          factories.add(line);
+        }
+      }
+      scanner.close();
+    }
+    return factories;
+  }
+}
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java
index aae093e7f..ea5c34560 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java
@@ -23,22 +23,13 @@ import 
org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.YarnQueue;
 import org.apache.streampark.console.core.service.impl.ApplicationServiceImpl;
 
-import org.apache.hc.core5.http.ContentType;
-
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
-import org.h2.store.fs.FileUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.mock.web.MockMultipartFile;
-import org.springframework.web.multipart.MultipartFile;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.nio.file.Path;
 import java.util.Date;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -105,26 +96,6 @@ class ApplicationServiceTest extends SpringTestBase {
     applicationService.start(application, false);
   }
 
-  @Test
-  void testUpload(@TempDir Path tempDir) throws Exception {
-    // specify the file path
-    File fileToStoreUploadFile =
-        new File(tempDir.toFile().getAbsolutePath() + 
"/fileToStoreUploadFile");
-    FileUtils.createFile(fileToStoreUploadFile.getAbsolutePath());
-
-    File fileToUpload = new File(tempDir.toFile().getAbsolutePath() + 
"/fileToUpload.jar");
-    FileUtils.createFile(fileToUpload.getAbsolutePath());
-    assertThat(fileToUpload).exists();
-    MultipartFile mulFile =
-        new MockMultipartFile(
-            "test", // fileName (eg: streampark.jar)
-            fileToUpload.getAbsolutePath(), // originalFilename (eg: path + 
fileName =
-            // /tmp/file/streampark.jar)
-            ContentType.APPLICATION_OCTET_STREAM.toString(),
-            new FileInputStream(fileToStoreUploadFile));
-    applicationService.upload(mulFile);
-  }
-
   @Test
   void testCheckQueueValidationIfNeeded() {
     ApplicationServiceImpl applicationServiceImpl = (ApplicationServiceImpl) 
applicationService;
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ResourceServiceTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ResourceServiceTest.java
new file mode 100644
index 000000000..e6c00ec71
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ResourceServiceTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+import org.apache.streampark.console.SpringTestBase;
+
+import org.apache.hc.core5.http.ContentType;
+
+import org.h2.store.fs.FileUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.mock.web.MockMultipartFile;
+import org.springframework.web.multipart.MultipartFile;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.nio.file.Path;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** org.apache.streampark.console.core.service.ResourceServiceTest. */
+class ResourceServiceTest extends SpringTestBase {
+
+  @Autowired private ResourceService resourceService;
+
+  @Test
+  void testUpload(@TempDir Path tempDir) throws Exception {
+    // specify the file path
+    File fileToStoreUploadFile =
+        new File(tempDir.toFile().getAbsolutePath() + 
"/fileToStoreUploadFile");
+    FileUtils.createFile(fileToStoreUploadFile.getAbsolutePath());
+
+    File fileToUpload = new File(tempDir.toFile().getAbsolutePath() + 
"/fileToUpload.jar");
+    FileUtils.createFile(fileToUpload.getAbsolutePath());
+    assertThat(fileToUpload).exists();
+    MultipartFile mulFile =
+        new MockMultipartFile(
+            "test", // fileName (eg: streampark.jar)
+            fileToUpload.getAbsolutePath(), // originalFilename (eg: path + 
fileName =
+            // /tmp/file/streampark.jar)
+            ContentType.APPLICATION_OCTET_STREAM.toString(),
+            new FileInputStream(fileToStoreUploadFile));
+    resourceService.upload(mulFile);
+  }
+}
diff --git 
a/streampark-console/streampark-console-webapp/src/api/flink/resource/index.ts 
b/streampark-console/streampark-console-webapp/src/api/flink/resource/index.ts
index 1b279c7a6..a0d045284 100644
--- 
a/streampark-console/streampark-console-webapp/src/api/flink/resource/index.ts
+++ 
b/streampark-console/streampark-console-webapp/src/api/flink/resource/index.ts
@@ -23,13 +23,16 @@ import {
   ResourceListRecord,
   ResourceParam,
 } from './model/resourceModel';
+import { ContentTypeEnum } from '/@/enums/httpEnum';
 
 enum RESOURCE_API {
   PAGE = '/resource/page',
   POST = '/resource/add',
   UPDATE = '/resource/update',
+  UPLOAD = '/resource/upload',
   DELETE = '/resource/delete',
   LIST = '/resource/list',
+  CHECK = '/resource/check',
 }
 
 /**
@@ -76,3 +79,18 @@ export function fetchResourceDelete(data: 
ResourceDeleteParam): Promise<AxiosRes
 export function fetchTeamResource(data: Recordable): 
Promise<ResourceListRecord[]> {
   return defHttp.post({ url: RESOURCE_API.LIST, data });
 }
+
+export function checkResource(data: ResourceParam): 
Promise<AxiosResponse<Result>> {
+  return defHttp.post({ url: RESOURCE_API.CHECK, data });
+}
+
+export function fetchUpload(params) {
+  return defHttp.post<string>({
+    url: RESOURCE_API.UPLOAD,
+    params,
+    headers: {
+      'Content-Type': ContentTypeEnum.FORM_DATA,
+    },
+    timeout: 1000 * 60 * 10, // Uploading files timed out for 10 minutes
+  });
+}
diff --git 
a/streampark-console/streampark-console-webapp/src/api/flink/resource/model/resourceModel.ts
 
b/streampark-console/streampark-console-webapp/src/api/flink/resource/model/resourceModel.ts
index 4de0f0f5e..3c720d38a 100644
--- 
a/streampark-console/streampark-console-webapp/src/api/flink/resource/model/resourceModel.ts
+++ 
b/streampark-console/streampark-console-webapp/src/api/flink/resource/model/resourceModel.ts
@@ -34,6 +34,7 @@ export interface ResourceListRecord {
 export interface ResourceParam {
   id?: string;
   resourceName: string;
+  connector?: string;
   engineType: string;
   description: string;
 }
diff --git 
a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/resource.ts
 
b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/resource.ts
index 19ffd1618..7e031498c 100644
--- 
a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/resource.ts
+++ 
b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/resource.ts
@@ -21,24 +21,32 @@ export default {
   deleteResource: 'Delete Resource',
   deletePopConfirm: 'Are you sure delete this resource ?',
   uploadResource: 'Upload Resource',
+  resourceName: 'Resource Name',
   resourceType: 'Resource Type',
   engineType: 'Engine Type',
   resourceGroup: 'Resource Group',
   groupName: 'Group Name',
+  resourceNamePlaceholder: 'Please input resource name',
   engineTypePlaceholder: 'Please select compute engine type',
   resourceGroupPlaceholder: 'Please choose resource',
   groupNamePlaceholder: 'Please input the group name',
   groupNameIsRequiredMessage: 'Group Name is required',
   multiPomTip: 'Do not add multiple dependencies at one time',
   addResourceTip: 'Please add a resource',
+  jarFileErrorTip: 'Jar file is null, please try again',
+  mainNullTip: 'Flink app invalid, main class is null',
+  connectorExistsTip: 'this connector already exists',
+  connectorInvalidTip: 'flink connector invalid, please check',
+  connectorInfoErrorTip: 'get flink connector information error.',
+  connectorModifyTip: 'this connector cannot be modified, because 
factoryIdentifier has changed',
   add: 'Add',
   success: ' successful',
   fail: ' failed',
   table: {
     title: 'Resource List',
     resourceName: 'Resource Name',
-    resourceNamePlaceholder: 'Please enter the resource name to search',
-    descriptionPlaceholder: 'Please enter description to search',
+    resourceNamePlaceholder: 'Please enter the resource name',
+    descriptionPlaceholder: 'Please enter description',
     createUser: 'Create User',
     createTime: 'Create Time',
     modifyTime: 'Modify Time',
@@ -46,6 +54,7 @@ export default {
   },
   form: {
     descriptionMessage: 'exceeds maximum length limit of 100 characters',
+    resourceNameIsRequiredMessage: 'resource name is required',
     engineTypeIsRequiredMessage: 'compute engine type is required',
     resourceTypeIsRequiredMessage: 'resource type is required',
     resourceTypePlaceholder: 'Please select resource type',
diff --git 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/resource.ts
 
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/resource.ts
index 023d0f36b..5b0dc99e5 100644
--- 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/resource.ts
+++ 
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/resource.ts
@@ -21,16 +21,24 @@ export default {
   deleteResource: '删除资源',
   deletePopConfirm: '你确定要删除这个资源?',
   uploadResource: '上传资源',
+  resourceName: '资源名称',
   resourceType: '资源类型',
   engineType: '计算引擎类型',
   resourceGroup: '资源组',
   groupName: '资源组名称',
   resourceGroupPlaceholder: '请选择组资源',
+  resourceNamePlaceholder: '请输入资源名称',
   groupNamePlaceholder: '请输入资源组名称',
   groupNameIsRequiredMessage: '资源组名称必填',
   engineTypePlaceholder: '请选择计算引擎类型',
   multiPomTip: '不支持同时添加多个依赖',
   addResourceTip: '请添加资源',
+  jarFileErrorTip: 'Jar 文件为空,请重试',
+  mainNullTip: 'Flink app 无效,主类为空',
+  connectorInvalidTip: '该连接器无效,请检查',
+  connectorExistsTip: '该连接器已经存在',
+  connectorModifyTip: '该连接器无法修改,factoryIdentifier 不能更改',
+  connectorInfoErrorTip: '获取改连接器信息出错',
   add: '添加',
   success: '成功',
   fail: '失败',
@@ -46,8 +54,9 @@ export default {
   },
   form: {
     descriptionMessage: '超过 100 个字符的最大长度限制',
-    engineTypeIsRequiredMessage: '计算引擎类型必选',
-    resourceTypeIsRequiredMessage: '资源类型必选',
+    resourceNameIsRequiredMessage: '资源名称为必填项',
+    engineTypeIsRequiredMessage: '计算引擎类型为必填项',
+    resourceTypeIsRequiredMessage: '资源类型为必填项',
     resourceTypePlaceholder: '请选择资源类型',
     exists: '资源已存在',
     empty: '资源不能为空',
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/styles/Add.less
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/styles/Add.less
index b6acf41cd..a45c73ed2 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/styles/Add.less
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/styles/Add.less
@@ -186,27 +186,26 @@
   }
 
   .dependency-box {
-    margin-right: 10px;
     margin-top: 15px;
-    margin-bottom: -10px;
+    width: 100%;
     border-radius: 5px;
-    background-color: @background-color-base;
+    background-color: #e6f4ff;
+    border: 1px solid #91caff;
     display: inline-block;
+    margin-bottom: -25px;
 
     .dependency-item {
       position: relative;
       border: unset;
+      background-color: unset;
       line-height: 35px;
       padding: 0 6px;
       width: unset;
       float: left;
-      margin: 2px 4px 0;
-
       .ant-alert-close-icon {
         position: relative;
         left: 5px;
         top: 6px;
-        color: @background-color-base;
       }
     }
   }
@@ -267,8 +266,14 @@
 }
 
 [data-theme='dark'] {
-  .app_controller .icon-close {
-    color: #ffffff73;
+  .app_controller {
+    .icon-close {
+      color: #ffffff73;
+    }
+    .dependency-box {
+      background-color: #111b26;
+      border: 1px solid #153450;
+    }
   }
 }
 
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/resource/View.vue
 
b/streampark-console/streampark-console-webapp/src/views/flink/resource/View.vue
index 0cae4e79e..bdb7ae75a 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/resource/View.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/resource/View.vue
@@ -26,45 +26,48 @@
       <template #resetBefore> 1111 </template>
       <template #bodyCell="{ column, record }">
         <template v-if="column.dataIndex === 'resourceType'">
-          <Tag
-            class="bold-tag"
-            color="#52c41a"
-            v-if="record.resourceType == ResourceTypeEnum.FLINK_APP"
-          >
-            FLINK_APP
+          <Tag color="processing" v-if="record.resourceType === 
ResourceTypeEnum.FLINK_APP">
+            <template #icon>
+              <img :src="flinkAppSvg" class="svg-icon" alt="Flink App" />
+            </template>
+            Flink App
           </Tag>
-          <Tag
-            class="bold-tag"
-            color="#2db7f5"
-            v-if="record.resourceType == ResourceTypeEnum.NORMAL_JAR"
-          >
-            NORMAL_JAR
-          </Tag>
-          <Tag
-            class="bold-tag"
-            color="#108ee9"
-            v-if="record.resourceType == ResourceTypeEnum.CONNECTOR"
-          >
-            CONNECTOR
+
+          <Tag color="processing" v-if="record.resourceType === 
ResourceTypeEnum.CONNECTOR">
+            <template #icon>
+              <img :src="connectorSvg" class="svg-icon" alt="Connector" />
+            </template>
+            Connector
           </Tag>
-          <Tag class="bold-tag" color="#79f379" v-if="record.resourceType == 
ResourceTypeEnum.UDXF">
+
+          <Tag color="processing" v-if="record.resourceType === 
ResourceTypeEnum.UDXF">
+            <template #icon>
+              <img :src="udxfSvg" class="svg-icon" alt="UDXF" />
+            </template>
             UDXF
           </Tag>
-          <Tag
-            class="bold-tag"
-            color="#fcaa80"
-            v-if="record.resourceType == ResourceTypeEnum.GROUP"
-          >
+
+          <Tag color="processing" v-if="record.resourceType === 
ResourceTypeEnum.NORMAL_JAR">
+            <template #icon>
+              <img :src="normalJarSvg" class="svg-icon" alt="Normal jar" />
+            </template>
+            Normal Jar
+          </Tag>
+
+          <Tag color="processing" v-if="record.resourceType === 
ResourceTypeEnum.GROUP">
+            <template #icon>
+              <img :src="groupSvg" class="svg-icon" alt="GROUP" />
+            </template>
             GROUP
           </Tag>
         </template>
         <template v-if="column.dataIndex === 'engineType'">
-          <Tag class="bold-tag" color="#e65270" v-if="record.engineType == 
EngineTypeEnum.FLINK">
-            FLINK
-          </Tag>
-          <Tag class="bold-tag" color="#f5be07" v-if="record.engineType == 
EngineTypeEnum.SPARK">
-            SPARK
-          </Tag>
+          <span v-if="record.engineType === EngineTypeEnum.FLINK">
+            <SvgIcon name="flink" /> Apache Flink
+          </span>
+          <span v-if="record.engineType === EngineTypeEnum.SPARK">
+            <SvgIcon name="spark" /> Apache Spark
+          </span>
         </template>
         <template v-if="column.dataIndex === 'action'">
           <TableAction
@@ -115,6 +118,13 @@
   import { fetchResourceDelete, fetchResourceList, fetchTeamResource } from 
'/@/api/flink/resource';
   import { EngineTypeEnum, ResourceTypeEnum } from 
'/@/views/flink/resource/resource.data';
   import { Tag } from 'ant-design-vue';
+  import SvgIcon from '/@/components/Icon/src/SvgIcon.vue';
+
+  import flinkAppSvg from '/@/assets/icons/flink2.svg';
+  import connectorSvg from '/@/assets/icons/connector.svg';
+  import udxfSvg from '/@/assets/icons/fx.svg';
+  import normalJarSvg from '/@/assets/icons/jar.svg';
+  import groupSvg from '/@/assets/icons/group.svg';
 
   const teamResource = ref<Array<any>>([]);
   const [registerDrawer, { openDrawer }] = useDrawer();
@@ -176,7 +186,7 @@
     });
     if (data.status === 'success') {
       createMessage.success(t('flink.resource.deleteResource') + 
t('flink.resource.success'));
-      reload();
+      await reload();
       updateTeamResource();
     } else {
       createMessage.error(t('flink.resource.deleteResource') + 
t('flink.resource.fail'));
@@ -202,3 +212,17 @@
     updateTeamResource();
   });
 </script>
+
+<style lang="less" scoped>
+  .svg-icon {
+    display: inline-block;
+    width: 14px;
+    height: 14px;
+
+    .svg-connector {
+      svg path {
+        fill: #fff0f6 !important;
+      }
+    }
+  }
+</style>
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/resource/components/Resource.vue
 
b/streampark-console/streampark-console-webapp/src/views/flink/resource/components/Resource.vue
index 7c6db92e2..626180012 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/resource/components/Resource.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/resource/components/Resource.vue
@@ -28,8 +28,10 @@
   import { Icon } from '/@/components/Icon';
   import { useMonaco } from '/@/hooks/web/useMonaco';
   import { Tabs, Alert, Tag, Space } from 'ant-design-vue';
-  import { fetchUpload } from '/@/api/flink/app/app';
+  import { fetchUpload } from '/@/api/flink/resource';
+
   import UploadJobJar from '/@/views/flink/app/components/UploadJobJar.vue';
+  import { ResourceTypeEnum } from '/@/views/flink/resource/resource.data';
 
   interface DependencyType {
     artifactId: string;
@@ -136,9 +138,8 @@
       loading.value = true;
       const formData = new FormData();
       formData.append('file', data.file);
-      await fetchUpload(formData);
       dependency.jar = {};
-      dependency.jar[data.file.name] = data.file.name;
+      dependency.jar[data.file.name] = await fetchUpload(formData);
       handleUpdateDependency();
     } catch (error) {
       console.error(error);
@@ -155,15 +156,15 @@
       deps.push(dependency.pom[v]);
     });
     Object.keys(dependency.jar).forEach((v: string) => {
-      jars.push(v);
+      jars.push(v + ':' + dependency.jar[v]);
     });
-
     dependencyRecords.value = deps;
     uploadJars.value = jars;
   }
 
   function handleRemoveJar(jar: string) {
-    delete dependency.jar[jar];
+    console.log(jar);
+    delete dependency.jar[jar.split(':')[0]];
     handleUpdateDependency();
   }
 
@@ -211,7 +212,7 @@
 </script>
 
 <template>
-  <template v-if="props.formModel.resourceType == 'FLINK_APP'">
+  <template v-if="props.formModel.resourceType === ResourceTypeEnum.FLINK_APP">
     <UploadJobJar :custom-request="handleCustomDepsRequest" 
v-model:loading="loading" />
   </template>
   <template v-else>
@@ -237,7 +238,7 @@
       <template #message>
         <Space>
           <Tag class="tag-dependency" color="#108ee9">JAR</Tag>
-          {{ jar }}
+          {{ jar.split(':')[0] }}
           <Icon
             icon="ant-design:close-outlined"
             class="icon-close cursor-pointer"
@@ -251,7 +252,9 @@
 </template>
 
 <style lang="less">
-  @import url('/@/views/flink/app/styles/Add.less');
+  .dependency-box {
+    margin-top: 10px;
+  }
   .apply-pom {
     z-index: 99;
     position: absolute;
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/resource/components/ResourceDrawer.vue
 
b/streampark-console/streampark-console-webapp/src/views/flink/resource/components/ResourceDrawer.vue
index 2d217d3fe..ba9c1e61c 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/resource/components/ResourceDrawer.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/resource/components/ResourceDrawer.vue
@@ -41,18 +41,20 @@
 
 <script lang="ts" setup>
   import { ref, computed, unref } from 'vue';
-  import { BasicForm, FormSchema, useForm } from '/@/components/Form';
+  import { BasicForm, useForm } from '/@/components/Form';
   import { BasicDrawer, useDrawerInner } from '/@/components/Drawer';
   import { Icon } from '/@/components/Icon';
   import { useI18n } from '/@/hooks/web/useI18n';
   import Resource from '/@/views/flink/resource/components/Resource.vue';
-  import { fetchAddResource, fetchUpdateResource } from 
'/@/api/flink/resource';
-  import { EngineTypeEnum } from '/@/views/flink/resource/resource.data';
+  import { fetchAddResource, fetchUpdateResource, checkResource } from 
'/@/api/flink/resource';
+  import { EngineTypeEnum, ResourceTypeEnum } from 
'/@/views/flink/resource/resource.data';
   import {
+    renderEngineType,
     renderResourceType,
     renderStreamParkResourceGroup,
   } from '/@/views/flink/resource/useResourceRender';
   import { useMessage } from '/@/hooks/web/useMessage';
+  import { exceptionPropWidth } from '/@/utils';
 
   const emit = defineEmits(['success', 'register']);
 
@@ -70,7 +72,7 @@
   const resourceId = ref<Nullable<number>>(null);
   const resourceRef = ref();
 
-  const getResourceFormSchema = computed((): FormSchema[] => {
+  const getResourceFormSchema = computed(() => {
     return [
       {
         field: 'resourceType',
@@ -81,18 +83,22 @@
           { required: true, message: 
t('flink.resource.form.resourceTypeIsRequiredMessage') },
         ],
       },
+      {
+        field: 'resourceName',
+        label: t('flink.resource.resourceName'),
+        component: 'Input',
+        ifShow: ({ values }) => values?.resourceType !== 
ResourceTypeEnum.CONNECTOR,
+        componentProps: { placeholder: 
t('flink.resource.resourceNamePlaceholder') },
+        rules: [
+          { required: true, message: 
t('flink.resource.form.resourceNameIsRequiredMessage') },
+        ],
+      },
       {
         field: 'engineType',
         label: t('flink.resource.engineType'),
         component: 'Select',
+        render: ({ model }) => renderEngineType({ model }),
         defaultValue: EngineTypeEnum.FLINK,
-        componentProps: {
-          placeholder: t('flink.resource.engineTypePlaceholder'),
-          options: [
-            { label: 'apache flink', value: EngineTypeEnum.FLINK, disabled: 
false },
-            { label: 'apache spark', value: EngineTypeEnum.SPARK, disabled: 
true },
-          ],
-        },
         rules: [{ required: true, message: 
t('flink.resource.form.engineTypeIsRequiredMessage') }],
       },
       {
@@ -100,7 +106,7 @@
         label: t('flink.resource.groupName'),
         component: 'Input',
         componentProps: { placeholder: 
t('flink.resource.groupNamePlaceholder') },
-        ifShow: ({ values }) => values?.resourceType == 'GROUP',
+        ifShow: ({ values }) => values?.resourceType === 
ResourceTypeEnum.GROUP,
         rules: [{ required: true, message: 
t('flink.resource.groupNameIsRequiredMessage') }],
       },
       {
@@ -109,21 +115,21 @@
         component: 'Select',
         render: ({ model }) =>
           renderStreamParkResourceGroup({ model, resources: 
unref(props.teamResource) }),
-        ifShow: ({ values }) => values?.resourceType == 'GROUP',
+        ifShow: ({ values }) => values?.resourceType === 
ResourceTypeEnum.GROUP,
       },
       {
         field: 'dependency',
         label: t('flink.resource.addResource'),
         component: 'Input',
         slot: 'resource',
-        ifShow: ({ values }) => values?.resourceType !== 'GROUP',
+        ifShow: ({ values }) => values?.resourceType !== 
ResourceTypeEnum.GROUP,
       },
       {
         field: 'mainClass',
         label: t('flink.app.mainClass'),
         component: 'Input',
         componentProps: { placeholder: 
t('flink.app.addAppTips.mainClassPlaceholder') },
-        ifShow: ({ values }) => values?.resourceType == 'FLINK_APP',
+        ifShow: ({ values }) => values?.resourceType === 
ResourceTypeEnum.FLINK_APP,
         rules: [{ required: true, message: 
t('flink.app.addAppTips.mainClassIsRequiredMessage') }],
       },
       {
@@ -148,17 +154,16 @@
   const [registerDrawer, { setDrawerProps, closeDrawer }] = useDrawerInner(
     async (data: Recordable) => {
       unref(resourceRef)?.setDefaultValue({});
-      resetFields();
+      await resetFields();
       setDrawerProps({ confirmLoading: false });
       isUpdate.value = !!data?.isUpdate;
       if (unref(isUpdate)) {
         resourceId.value = data.record.id;
-        setFieldsValue(data.record);
-
-        if (data.record?.resourceType == 'GROUP') {
-          setFieldsValue({ resourceGroup: JSON.parse(data.record.resource || 
'[]') });
+        await setFieldsValue(data.record);
+        if (data.record?.resourceType === ResourceTypeEnum.GROUP) {
+          await setFieldsValue({ resourceGroup: 
JSON.parse(data.record.resource || '[]') });
         } else {
-          setFieldsValue({ dependency: data.record.resource });
+          await setFieldsValue({ dependency: data.record.resource });
           unref(resourceRef)?.setDefaultValue(JSON.parse(data.record.resource 
|| '{}'));
         }
       }
@@ -172,17 +177,17 @@
   // form submit
   async function handleSubmit() {
     try {
+      const id = resourceId.value;
+      resourceId.value = null;
       const values = await validate();
       let resourceJson = '';
-
-      if (values.resourceType == 'GROUP') {
+      if (values.resourceType == ResourceTypeEnum.GROUP) {
         resourceJson = JSON.stringify(values.resourceGroup);
       } else {
         const resource: { pom?: string; jar?: string } = {};
         unref(resourceRef).handleApplyPom();
         const dependencyRecords = unref(resourceRef)?.dependencyRecords;
         const uploadJars = unref(resourceRef)?.uploadJars;
-
         if (unref(dependencyRecords) && unref(dependencyRecords).length > 0) {
           if (unref(dependencyRecords).length > 1) {
             Swal.fire('Failed', t('flink.resource.multiPomTip'), 'error');
@@ -210,16 +215,69 @@
         }
 
         resourceJson = JSON.stringify(resource);
+        // check resource
+        const resp = await checkResource({
+          id: id,
+          resource: resourceJson,
+          ...values,
+        });
+        const state = resp['state'];
+        switch (state) {
+          case 1:
+            // download error
+            if (resource.pom?.length > 0) {
+              Swal.fire({
+                icon: 'error',
+                title: t('sys.api.errorTip'),
+                width: exceptionPropWidth(),
+                html: '<pre class="api-exception">' + resp['exception'] + 
'</pre>',
+                focusConfirm: false,
+              });
+            } else {
+              Swal.fire('Failed', t('flink.resource.jarFileErrorTip'), 
'error');
+            }
+            break;
+          case 2:
+            if (values.resourceType == ResourceTypeEnum.FLINK_APP) {
+              Swal.fire('Failed', t('flink.resource.mainNullTip'), 'error');
+            }
+            if (values.resourceType == ResourceTypeEnum.CONNECTOR) {
+              Swal.fire('Failed', t('flink.resource.connectorInvalidTip'), 
'error');
+            }
+            break;
+          case 3:
+            Swal.fire(
+              'Failed',
+              t('flink.resource.connectorInfoErrorTip').concat(': 
').concat(resp['name']),
+              'error',
+            );
+            break;
+          case 4:
+            Swal.fire('Failed', t('flink.resource.connectorExistsTip'), 
'error');
+            break;
+          case 5:
+            Swal.fire('Failed', t('flink.resource.connectorModifyTip'), 
'error');
+            break;
+          case 0:
+            const connector = resp['connector'] || null;
+            setDrawerProps({ confirmLoading: true });
+            await (isUpdate.value
+              ? fetchUpdateResource({
+                  id: id,
+                  resource: resourceJson,
+                  connector: connector,
+                  ...values,
+                })
+              : fetchAddResource({ resource: resourceJson, connector: 
connector, ...values }));
+            unref(resourceRef)?.setDefaultValue({});
+            closeDrawer();
+            emit('success', isUpdate.value);
+            break;
+          default:
+            break;
+        }
+        await resetFields();
       }
-
-      setDrawerProps({ confirmLoading: true });
-      await (isUpdate.value
-        ? fetchUpdateResource({ id: resourceId.value, resource: resourceJson, 
...values })
-        : fetchAddResource({ resource: resourceJson, ...values }));
-      unref(resourceRef)?.setDefaultValue({});
-      resetFields();
-      closeDrawer();
-      emit('success', isUpdate.value);
     } finally {
       setDrawerProps({ confirmLoading: false });
     }
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/resource/useResourceRender.tsx
 
b/streampark-console/streampark-console-webapp/src/views/flink/resource/useResourceRender.tsx
index 57b2c077b..1a35f1b22 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/resource/useResourceRender.tsx
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/resource/useResourceRender.tsx
@@ -16,8 +16,10 @@
  */
 import { Select, Tag } from 'ant-design-vue';
 import { useI18n } from '/@/hooks/web/useI18n';
-import { ResourceTypeEnum } from '/@/views/flink/resource/resource.data';
+import { EngineTypeEnum, ResourceTypeEnum } from 
'/@/views/flink/resource/resource.data';
+
 import flinkAppSvg from '/@/assets/icons/flink2.svg';
+import sparkSvg from '/@/assets/icons/spark.svg';
 import connectorSvg from '/@/assets/icons/connector.svg';
 import udxfSvg from '/@/assets/icons/fx.svg';
 import normalJarSvg from '/@/assets/icons/jar.svg';
@@ -61,6 +63,39 @@ export const renderResourceType = ({ model }) => {
   );
 };
 
+/* render resource type label */
+export const renderEngineType = ({ model }) => {
+  const renderOptions = () => {
+    const options = [
+      { label: 'Apache Flink', value: EngineTypeEnum.FLINK, disabled: false, 
src: flinkAppSvg },
+      { label: 'Apache Spark', value: EngineTypeEnum.SPARK, disabled: true, 
src: sparkSvg },
+    ];
+    return options.map(({ label, value, disabled, src }) => {
+      return (
+        <Select.Option key={value} label={label} disabled={disabled}>
+          <div>
+            <img src={src} style="display: inline-block; width: 20px; height: 
20px"></img>
+            <span style="vertical-align: middle; margin-left: 
5px;">{label}</span>
+          </div>
+        </Select.Option>
+      );
+    });
+  };
+
+  return (
+    <div>
+      <Select
+        allow-clear
+        placeholder={t('flink.resource.engineTypePlaceholder')}
+        value={model.engineType}
+        onChange={(value) => (model.engineType = value)}
+      >
+        {renderOptions()}
+      </Select>
+    </div>
+  );
+};
+
 export const renderStreamParkResourceGroup = ({ model, resources }) => {
   const renderOptions = () => {
     console.log('resources', resources);
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/bean/LoadStatusFailedException.java
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/bean/LoadStatusFailedException.java
index 77617b7a0..afb6b49ab 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/bean/LoadStatusFailedException.java
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/bean/LoadStatusFailedException.java
@@ -22,7 +22,7 @@ import java.util.Map;
 
 public class LoadStatusFailedException extends IOException {
 
-  static final long serialVersionUID = 1L;
+  private static final long serialVersionUID = 1L;
   private final Map<String, Object> response;
   private boolean reCreateLabel;
 
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
index 2b07cbc74..03eb83839 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
@@ -43,6 +43,7 @@ import javax.annotation.{Nonnull, Nullable}
 
 import java.io.File
 import java.util
+import java.util.{List => JavaList, Set => JavaSet}
 
 import scala.collection.convert.ImplicitConversions._
 import scala.collection.mutable.ArrayBuffer
@@ -161,6 +162,10 @@ object MavenTool extends Logger {
     buildFatJar(mainClass, jarLibs ++ artFilePaths, outFatJarPath)
   }
 
+  @throws[Exception]
+  def resolveArtifacts(mavenArtifact: Artifact): JavaList[File] = 
resolveArtifacts(
+    Set(mavenArtifact))
+
   /**
    * Resolve the collectoin of artifacts, Artifacts will be download to 
ConfigConst.MAVEN_LOCAL_DIR
    * if necessary. notes: Only compile scope dependencies will be resolved.
@@ -171,13 +176,16 @@ object MavenTool extends Logger {
    *   jar File Object of resolved artifacts
    */
   @throws[Exception]
-  def resolveArtifacts(mavenArtifacts: Set[Artifact]): Set[File] = {
-    if (mavenArtifacts == null) Set.empty[File];
+  def resolveArtifacts(mavenArtifacts: JavaSet[Artifact]): JavaList[File] = {
+    if (mavenArtifacts == null) List.empty[File]
     else {
       val (repoSystem, session) = getMavenEndpoint()
       val artifacts = mavenArtifacts.map(
         e => {
-          new DefaultArtifact(e.groupId, e.artifactId, e.classifier, "jar", 
e.version)
+          val artifact =
+            new DefaultArtifact(e.groupId, e.artifactId, e.classifier, "jar", 
e.version)
+          artifact.getProperties
+          artifact
         })
       logInfo(s"start resolving dependencies: ${artifacts.mkString}")
 
@@ -201,7 +209,7 @@ object MavenTool extends Logger {
       repoSystem
         .resolveArtifacts(session, artReqs)
         .map(_.getArtifact.getFile)
-        .toSet
+        .toList
     }
   }
 
@@ -236,7 +244,7 @@ object MavenTool extends Logger {
     (repoSystem, session)
   }
 
-  class ShadeFilter extends Filter {
+  private[this] class ShadeFilter extends Filter {
     override def canFilter(jar: File): Boolean = true
 
     override def isFiltered(name: String): Boolean = {
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
index 92c1d9199..b9167d437 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
@@ -28,6 +28,8 @@ import org.apache.commons.codec.digest.DigestUtils
 
 import java.io.{File, FileInputStream, IOException}
 
+import scala.collection.convert.ImplicitConversions._
+
 /** Building pipeline for flink yarn application mode */
 class FlinkYarnApplicationBuildPipeline(request: 
FlinkYarnApplicationBuildRequest)
   extends BuildPipeline {
@@ -59,7 +61,7 @@ class FlinkYarnApplicationBuildPipeline(request: 
FlinkYarnApplicationBuildReques
           case DevelopmentMode.FLINK_SQL =>
             val mavenArts = 
MavenTool.resolveArtifacts(request.dependencyInfo.mavenArts)
             mavenArts.map(_.getAbsolutePath) ++ 
request.dependencyInfo.extJarLibs
-          case _ => Set[String]()
+          case _ => List[String]()
         }
       }.getOrElse(throw getError.exception)
 
diff --git 
a/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala
 
b/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala
index 7b0803c45..2bd91b8a7 100644
--- 
a/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala
@@ -26,6 +26,7 @@ import org.scalatest.wordspec.AnyWordSpec
 import java.io.File
 import java.util.jar.JarFile
 
+import scala.collection.convert.ImplicitConversions._
 import scala.language.postfixOps
 
 class MavenToolSpec extends AnyWordSpec with BeforeAndAfterAll with Matchers {

Reply via email to