This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
     new 97cc84c77 [Bug] deploy flink job on yarn, get state bug fixed. (#3329)
97cc84c77 is described below

commit 97cc84c77695814d311bd495921095d0c1405856
Author: benjobs <[email protected]>
AuthorDate: Sun Nov 12 00:17:59 2023 +0800

    [Bug] deploy flink job on yarn, get state bug fixed. (#3329)
    
    * [Improve] resolve maven artifacts improvement
    
    * [Bug] extract programArgs bug fixed
    
    * [Improve] job maven dependency support exclusion
    
    * [Improve] maven dependency check improvement
    
    ---------
    
    Co-authored-by: benjobs <[email protected]>
---
 .../streampark/common/util/PropertiesUtils.scala   |  13 ++-
 .../apache/streampark/common/util/YarnUtils.scala  |  11 +-
 .../common/util/PropertiesUtilsTestCase.scala      |  16 ++-
 .../console/core/bean/MavenDependency.java         | 101 +++++++++++++++++++
 .../console/core/bean/MavenExclusion.java          |  48 +++++----
 .../streampark/console/core/bean/MavenPom.java     |  93 +++++++++++++++++
 .../console/core/entity/Application.java           | 112 ++-------------------
 .../streampark/console/core/entity/FlinkSql.java   |   9 +-
 .../core/service/impl/AppBuildPipeServiceImpl.java |  81 +++++++++------
 .../core/service/impl/ApplicationServiceImpl.java  |   9 +-
 .../console/core/task/FlinkRESTAPIWatcher.java     |   3 -
 .../components/Table/src/components/HeaderCell.vue |   2 +-
 .../src/views/flink/app/components/Dependency.vue  |  18 ++--
 .../flink/app/hooks/useCreateAndEditSchema.ts      |  10 +-
 .../src/views/flink/app/utils/Pom.ts               |  77 +++++---------
 .../flink/client/trait/FlinkClientTrait.scala      |  47 +++++----
 .../streampark/flink/packer/maven/Artifact.scala   |   7 +-
 .../{DependencyInfo.scala => MavenArtifact.scala}  |  10 +-
 .../streampark/flink/packer/maven/MavenTool.scala  | 102 +++++++++++--------
 .../flink/packer/pipeline/BuildRequest.scala       |  16 +--
 .../impl/FlinkK8sApplicationBuildPipeline.scala    |   2 +-
 .../streampark/flink/packer/MavenToolSpec.scala    |   4 +-
 22 files changed, 467 insertions(+), 324 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index 3098daa5d..35facd89f 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -322,7 +322,18 @@ object PropertiesUtils extends Logger {
               }
             }
             programArgs += value.substring(1, value.length - 1)
-          case _ => programArgs += v
+          case _ =>
+            val regexp = "(.*)='(.*)'$"
+            if (v.matches(regexp)) {
+              programArgs += v.replaceAll(regexp, "$1=$2")
+            } else {
+              val regexp = "(.*)=\"(.*)\"$"
+              if (v.matches(regexp)) {
+                programArgs += v.replaceAll(regexp, "$1=$2")
+              } else {
+                programArgs += v
+              }
+            }
         }
       }
     }
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
index 83db1d78c..d20b7bb6e 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
@@ -30,6 +30,7 @@ import org.apache.http.client.methods.HttpGet
 import org.apache.http.client.protocol.HttpClientContext
 import org.apache.http.impl.client.HttpClients
 
+import java.io.IOException
 import java.net.InetAddress
 import java.security.PrivilegedExceptionAction
 import java.util
@@ -257,20 +258,19 @@ object YarnUtils extends Logger {
    *   url
    * @return
    */
+  @throws[IOException]
   def restRequest(url: String): String = {
     if (url == null) return null
-
     url match {
       case u if u.matches("^http(|s)://.*") =>
         Try(request(url)) match {
           case Success(v) => v
           case Failure(e) =>
             if (hasYarnHttpKerberosAuth) {
-              logError(s"yarnUtils authRestRequest error, url: $u, detail: $e")
+              throw new IOException(s"yarnUtils authRestRequest error, url: 
$u, detail: $e")
             } else {
-              logError(s"yarnUtils restRequest error, url: $u, detail: $e")
+              throw new IOException(s"yarnUtils restRequest error, url: $u, 
detail: $e")
             }
-            null
         }
       case _ =>
         Try(request(s"${getRMWebAppURL()}/$url")) match {
@@ -281,8 +281,7 @@ object YarnUtils extends Logger {
             } match {
               case Success(v) => v
               case Failure(e) =>
-                logError(s"yarnUtils restRequest retry 5 times all failed. 
detail: $e")
-                null
+                throw new IOException(s"yarnUtils restRequest retry 5 times 
all failed. detail: $e")
             }
         }
     }
diff --git 
a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
 
b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
index 995cfd2ec..827d0f11c 100644
--- 
a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
+++ 
b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
@@ -24,8 +24,20 @@ import scala.language.postfixOps
 class PropertiesUtilsTestCase {
 
   @Test def testExtractProgramArgs(): Unit = {
-    val args =
-      "mysql-sync-database \n--database employees \n--mysql-conf 
hostname=127.0.0.1 \n--mysql-conf port=3306 \n--mysql-conf username=root 
\n--mysql-conf password=123456 \n--mysql-conf database-name=employees 
\n--including-tables 'test|test.*' \n--sink-conf fenodes=127.0.0.1:8030 
\n--sink-conf username=root \n--sink-conf password= \n--sink-conf 
jdbc-url=jdbc:mysql://127.0.0.1:9030 \n--sink-conf 
sink.label-prefix=label\n--table-conf replication_num=1 "
+    val args = "mysql-sync-table \n" +
+      "--warehouse hdfs:///paimon \n" +
+      "--database test_db \n" +
+      "--table test_table \n" +
+      "--mysql-conf hostname=localhost \n" +
+      "--mysql-conf username=root \n" +
+      "--mysql-conf password=123456 \n" +
+      "--mysql-conf database-name='employees' \n" +
+      "--mysql-conf table-name='employees' \n" +
+      "--catalog-conf metastore=hive \n" +
+      "--catalog-conf uri=thrift://localhost:9083 \n" +
+      "--table-conf bucket=1 \n" +
+      "--table-conf changelog-producer=input \n" +
+      "--table-conf sink.parallelism=1"
     val programArgs = new ArrayBuffer[String]()
     programArgs ++= PropertiesUtils.extractArguments(args)
     println(programArgs)
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenDependency.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenDependency.java
new file mode 100644
index 000000000..ec6bb14a8
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenDependency.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.bean;
+
+import org.apache.streampark.common.conf.Workspace;
+import org.apache.streampark.common.util.FileUtils;
+import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.base.util.WebUtils;
+import org.apache.streampark.flink.packer.maven.Artifact;
+import org.apache.streampark.flink.packer.maven.MavenArtifact;
+
+import org.apache.commons.lang3.StringUtils;
+
+import lombok.Data;
+import lombok.SneakyThrows;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Data
+public class MavenDependency {
+
+  private Set<MavenPom> pom = Collections.emptySet();
+
+  private Set<String> jar = Collections.emptySet();
+
+  @SneakyThrows
+  public static MavenDependency of(String dependency) {
+    if (StringUtils.isNotBlank(dependency)) {
+      return JacksonUtils.read(dependency, MavenDependency.class);
+    }
+    return new MavenDependency();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (this == that) {
+      return true;
+    }
+
+    if (that == null || getClass() != that.getClass()) {
+      return false;
+    }
+
+    MavenDependency thatDep = (MavenDependency) that;
+
+    if (this.pom.size() != thatDep.pom.size()
+        || this.jar.size() != thatDep.jar.size()
+        || !this.pom.containsAll(thatDep.pom)) {
+      return false;
+    }
+
+    File localJar = WebUtils.getAppTempDir();
+    File localUploads = new File(Workspace.local().APP_UPLOADS());
+    for (String jarName : jar) {
+      if (!thatDep.jar.contains(jarName)
+          || !FileUtils.equals(new File(localJar, jarName), new 
File(localUploads, jarName))) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  public MavenArtifact toMavenArtifact() {
+    List<Artifact> mvnArts =
+        this.pom.stream()
+            .map(
+                pom ->
+                    new Artifact(
+                        pom.getGroupId(),
+                        pom.getArtifactId(),
+                        pom.getVersion(),
+                        pom.getClassifier(),
+                        pom.toExclusionString()))
+            .collect(Collectors.toList());
+    List<String> extJars =
+        this.jar.stream()
+            .map(jar -> Workspace.local().APP_UPLOADS() + "/" + jar)
+            .collect(Collectors.toList());
+    return new MavenArtifact(mvnArts, extJars);
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/DependencyInfo.scala
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenExclusion.java
similarity index 52%
copy from 
streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/DependencyInfo.scala
copy to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenExclusion.java
index c80e416e6..9ab025f36 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/DependencyInfo.scala
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenExclusion.java
@@ -15,29 +15,41 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.flink.packer.maven
+package org.apache.streampark.console.core.bean;
 
-import java.util.{List => JavaList}
+import lombok.Data;
 
-import scala.collection.JavaConversions._
+import java.util.Objects;
 
-/**
- * @param mavenArts
- *   collection of maven artifacts
- * @param extJarLibs
- *   collection of jar lib paths, which elements can be a directory or file 
path.
- */
-case class DependencyInfo(mavenArts: Set[Artifact] = Set(), extJarLibs: 
Set[String] = Set()) {
+@Data
+public class MavenExclusion {
 
-  def this(mavenArts: JavaList[Artifact], extJarLibs: JavaList[String]) {
-    this(mavenArts.toSet, extJarLibs.toSet)
-  }
+  private String groupId;
 
-  def merge(jarLibs: Set[String]): DependencyInfo =
-    if (jarLibs != null) DependencyInfo(mavenArts, extJarLibs ++ jarLibs) else 
this.copy()
+  private String artifactId;
 
-}
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
 
-object DependencyInfo {
-  def empty: DependencyInfo = new DependencyInfo()
+    MavenExclusion that = (MavenExclusion) o;
+    if (this.groupId == null || that.groupId == null) {
+      return false;
+    }
+    if (this.artifactId == null || that.artifactId == null) {
+      return false;
+    }
+    return this.groupId.equals(that.groupId) && 
this.artifactId.equals(that.artifactId);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(groupId, artifactId);
+  }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenPom.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenPom.java
new file mode 100644
index 000000000..21164061c
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenPom.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.bean;
+
+import org.apache.commons.lang3.StringUtils;
+
+import lombok.Data;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Data
+public class MavenPom {
+  private String groupId;
+  private String artifactId;
+  private String version;
+  private String classifier;
+  private Set<MavenExclusion> exclusions = Collections.emptySet();
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    MavenPom that = (MavenPom) o;
+
+    boolean basic =
+        this.groupId.equals(that.groupId)
+            && this.artifactId.equals(that.artifactId)
+            && this.version.equals(that.version);
+
+    boolean classify =
+        StringUtils.isAllBlank(this.classifier, that.classifier)
+            || this.classifier.equals(that.classifier);
+
+    if (basic && classify) {
+      Set<MavenExclusion> thisEx =
+          this.exclusions == null ? Collections.emptySet() : this.exclusions;
+      Set<MavenExclusion> thatEx =
+          that.exclusions == null ? Collections.emptySet() : that.exclusions;
+      return thisEx.size() == thatEx.size() && thisEx.containsAll(thatEx);
+    }
+    return false;
+  }
+
+  public String artifactName() {
+    if (StringUtils.isBlank(classifier)) {
+      return String.format("%s-%s.jar", artifactId, version);
+    }
+    return String.format("%s-%s-%s.jar", artifactId, version, classifier);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(groupId, artifactId, version, classifier);
+  }
+
+  @Override
+  public String toString() {
+    return groupId + ":" + artifactId + ":" + version + getClassifier();
+  }
+
+  public Set<String> toExclusionString() {
+    return this.exclusions.stream()
+        .map(x -> String.format("%s:%s", x.getGroupId(), x.getArtifactId()))
+        .collect(Collectors.toSet());
+  }
+
+  public String getClassifier() {
+    return StringUtils.isBlank(classifier) ? "" : ":" + classifier;
+  }
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 5b3aa4db2..d963c5477 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -26,20 +26,17 @@ import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.enums.FlinkK8sRestExposedType;
 import org.apache.streampark.common.enums.StorageType;
 import org.apache.streampark.common.fs.FsOperator;
-import org.apache.streampark.common.util.FileUtils;
-import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.util.JacksonUtils;
 import org.apache.streampark.console.base.util.ObjectUtils;
-import org.apache.streampark.console.base.util.WebUtils;
 import org.apache.streampark.console.core.bean.AppControl;
+import org.apache.streampark.console.core.bean.MavenDependency;
 import org.apache.streampark.console.core.enums.FlinkAppState;
 import org.apache.streampark.console.core.enums.ReleaseState;
 import org.apache.streampark.console.core.enums.ResourceFrom;
 import org.apache.streampark.console.core.metrics.flink.JobsOverview;
 import org.apache.streampark.console.core.utils.YarnQueueLabelExpression;
 import org.apache.streampark.flink.kubernetes.model.K8sPodTemplates;
-import org.apache.streampark.flink.packer.maven.Artifact;
-import org.apache.streampark.flink.packer.maven.DependencyInfo;
+import org.apache.streampark.flink.packer.maven.MavenArtifact;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -49,24 +46,19 @@ import com.baomidou.mybatisplus.annotation.TableField;
 import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableName;
 import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.core.type.TypeReference;
 import lombok.Data;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 
 import javax.annotation.Nonnull;
 
-import java.io.File;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.stream.Collectors;
 
 import static org.apache.streampark.console.core.enums.FlinkAppState.of;
 
@@ -487,13 +479,13 @@ public class Application implements Serializable {
 
   @JsonIgnore
   @SneakyThrows
-  public Dependency getDependencyObject() {
-    return Dependency.toDependency(this.dependency);
+  public MavenDependency getMavenDependency() {
+    return MavenDependency.of(this.dependency);
   }
 
   @JsonIgnore
-  public DependencyInfo getDependencyInfo() {
-    return 
Application.Dependency.toDependency(getDependency()).toJarPackDeps();
+  public MavenArtifact getMavenArtifact() {
+    return getMavenDependency().toMavenArtifact();
   }
 
   @JsonIgnore
@@ -650,65 +642,6 @@ public class Application implements Serializable {
     return ExecutionMode.YARN_PER_JOB.equals(mode) || 
ExecutionMode.YARN_APPLICATION.equals(mode);
   }
 
-  @Data
-  public static class Dependency {
-    private List<Pom> pom = Collections.emptyList();
-    private List<String> jar = Collections.emptyList();
-
-    @SneakyThrows
-    public static Dependency toDependency(String dependency) {
-      if (Utils.notEmpty(dependency)) {
-        return JacksonUtils.read(dependency, new TypeReference<Dependency>() 
{});
-      }
-      return new Dependency();
-    }
-
-    public boolean isEmpty() {
-      return pom.isEmpty() && jar.isEmpty();
-    }
-
-    public boolean eq(Dependency other) {
-      if (other == null) {
-        return false;
-      }
-      if (this.isEmpty() && other.isEmpty()) {
-        return true;
-      }
-
-      if (this.pom.size() != other.pom.size() || this.jar.size() != 
other.jar.size()) {
-        return false;
-      }
-      File localJar = WebUtils.getAppTempDir();
-      File localUploads = new File(Workspace.local().APP_UPLOADS());
-      HashSet<String> otherJars = new HashSet<>(other.jar);
-      for (String jarName : jar) {
-        if (!otherJars.contains(jarName)
-            || !FileUtils.equals(new File(localJar, jarName), new 
File(localUploads, jarName))) {
-          return false;
-        }
-      }
-      return new HashSet<>(pom).containsAll(other.pom);
-    }
-
-    public DependencyInfo toJarPackDeps() {
-      List<Artifact> mvnArts =
-          this.pom.stream()
-              .map(
-                  pom ->
-                      new Artifact(
-                          pom.getGroupId(),
-                          pom.getArtifactId(),
-                          pom.getVersion(),
-                          pom.getClassifier()))
-              .collect(Collectors.toList());
-      List<String> extJars =
-          this.jar.stream()
-              .map(jar -> Workspace.local().APP_UPLOADS() + "/" + jar)
-              .collect(Collectors.toList());
-      return new DependencyInfo(mvnArts, extJars);
-    }
-  }
-
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -724,37 +657,4 @@ public class Application implements Serializable {
   public int hashCode() {
     return Objects.hash(id);
   }
-
-  @Data
-  public static class Pom {
-    private String groupId;
-    private String artifactId;
-    private String version;
-    private String classifier;
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      return this.toString().equals(o.toString());
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(groupId, artifactId, version, classifier);
-    }
-
-    @Override
-    public String toString() {
-      return groupId + ":" + artifactId + ":" + version + getClassifier();
-    }
-
-    private String getClassifier() {
-      return StringUtils.isEmpty(classifier) ? "" : ":" + classifier;
-    }
-  }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
index 57be322d7..d944e14f8 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java
@@ -18,6 +18,7 @@
 package org.apache.streampark.console.core.entity;
 
 import org.apache.streampark.common.util.DeflaterUtils;
+import org.apache.streampark.console.core.bean.MavenDependency;
 import org.apache.streampark.console.core.enums.ChangedType;
 
 import com.baomidou.mybatisplus.annotation.IdType;
@@ -82,12 +83,10 @@ public class FlinkSql {
     // 1) determine if sql statement has changed
     boolean sqlDifference = 
!this.getSql().trim().equals(target.getSql().trim());
     // 2) determine if dependency has changed
-    Application.Dependency thisDependency =
-        Application.Dependency.toDependency(this.getDependency());
-    Application.Dependency targetDependency =
-        Application.Dependency.toDependency(target.getDependency());
+    MavenDependency thisDependency = MavenDependency.of(this.getDependency());
+    MavenDependency targetDependency = 
MavenDependency.of(target.getDependency());
 
-    boolean depDifference = !thisDependency.eq(targetDependency);
+    boolean depDifference = !thisDependency.equals(targetDependency);
     if (sqlDifference && depDifference) {
       return ChangedType.ALL;
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 07c66ff05..33afcf881 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -50,6 +50,7 @@ import 
org.apache.streampark.console.core.service.MessageService;
 import org.apache.streampark.console.core.service.SettingService;
 import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
 import org.apache.streampark.flink.packer.docker.DockerConf;
+import org.apache.streampark.flink.packer.maven.Artifact;
 import org.apache.streampark.flink.packer.maven.MavenTool;
 import org.apache.streampark.flink.packer.pipeline.BuildPipeline;
 import org.apache.streampark.flink.packer.pipeline.BuildResult;
@@ -71,6 +72,7 @@ import 
org.apache.streampark.flink.packer.pipeline.impl.FlinkK8sSessionBuildPipe
 import 
org.apache.streampark.flink.packer.pipeline.impl.FlinkRemoteBuildPipeline;
 import 
org.apache.streampark.flink.packer.pipeline.impl.FlinkYarnApplicationBuildPipeline;
 
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.collections.CollectionUtils;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@@ -86,11 +88,15 @@ import 
org.springframework.transaction.annotation.Transactional;
 import javax.annotation.Nonnull;
 
 import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -164,7 +170,7 @@ public class AppBuildPipeServiceImpl
     pipeline.registerWatcher(
         new PipeWatcher() {
           @Override
-          public void onStart(PipeSnapshot snapshot) {
+          public void onStart(PipeSnapshot snapshot) throws Exception {
             AppBuildPipeline buildPipeline =
                 
AppBuildPipeline.fromPipeSnapshot(snapshot).setAppId(app.getId());
             saveEntity(buildPipeline);
@@ -301,7 +307,7 @@ public class AppBuildPipeServiceImpl
                 mainClass,
                 yarnProvidedPath,
                 app.getDevelopmentMode(),
-                app.getDependencyInfo());
+                app.getMavenArtifact());
         log.info("Submit params to building pipeline : {}", yarnAppRequest);
         return FlinkYarnApplicationBuildPipeline.of(yarnAppRequest);
       case YARN_PER_JOB:
@@ -316,7 +322,7 @@ public class AppBuildPipeServiceImpl
                 app.getExecutionModeEnum(),
                 app.getDevelopmentMode(),
                 flinkEnv.getFlinkVersion(),
-                app.getDependencyInfo());
+                app.getMavenArtifact());
         log.info("Submit params to building pipeline : {}", buildRequest);
         return FlinkRemoteBuildPipeline.of(buildRequest);
       case KUBERNETES_NATIVE_SESSION:
@@ -329,7 +335,7 @@ public class AppBuildPipeServiceImpl
                 app.getExecutionModeEnum(),
                 app.getDevelopmentMode(),
                 flinkEnv.getFlinkVersion(),
-                app.getDependencyInfo(),
+                app.getMavenArtifact(),
                 app.getClusterId(),
                 app.getK8sNamespace());
         log.info("Submit params to building pipeline : {}", 
k8sSessionBuildRequest);
@@ -344,7 +350,7 @@ public class AppBuildPipeServiceImpl
                 app.getExecutionModeEnum(),
                 app.getDevelopmentMode(),
                 flinkEnv.getFlinkVersion(),
-                app.getDependencyInfo(),
+                app.getMavenArtifact(),
                 app.getClusterId(),
                 app.getK8sNamespace(),
                 app.getFlinkImage(),
@@ -364,7 +370,7 @@ public class AppBuildPipeServiceImpl
     }
   }
 
-  private void prepareJars(Application app) {
+  private void prepareJars(Application app) throws IOException {
     File localUploadDIR = new File(Workspace.local().APP_UPLOADS());
     if (!localUploadDIR.exists()) {
       localUploadDIR.mkdirs();
@@ -373,8 +379,8 @@ public class AppBuildPipeServiceImpl
     FsOperator localFS = FsOperator.lfs();
     // 1. copy jar to local upload dir
     if (app.isFlinkSqlJob() || app.isUploadJob()) {
-      if (!app.getDependencyObject().getJar().isEmpty()) {
-        for (String jar : app.getDependencyObject().getJar()) {
+      if (!app.getMavenDependency().getJar().isEmpty()) {
+        for (String jar : app.getMavenDependency().getJar()) {
           File localJar = new File(WebUtils.getAppTempDir(), jar);
           File localUploadJar = new File(localUploadDIR, jar);
           if (!localJar.exists() && !localUploadJar.exists()) {
@@ -398,21 +404,10 @@ public class AppBuildPipeServiceImpl
         checkOrElseUploadJar(localFS, localJar, localUploadJar, 
localUploadDIR);
 
         // 2) copy jar to local $app_home/lib
-        boolean cleanUpload = false;
         File libJar = new File(app.getLocalAppLib(), app.getJar());
-        if (!localFS.exists(app.getLocalAppLib())) {
-          cleanUpload = true;
-        } else {
-          if (libJar.exists()) {
-            if (!FileUtils.equals(localJar, libJar)) {
-              cleanUpload = true;
-            }
-          } else {
-            cleanUpload = true;
-          }
-        }
-
-        if (cleanUpload) {
+        if (!localFS.exists(app.getLocalAppLib())
+            || !libJar.exists()
+            || !FileUtils.equals(localJar, libJar)) {
           localFS.mkCleanDirs(app.getLocalAppLib());
           localFS.upload(localUploadJar.getAbsolutePath(), 
app.getLocalAppLib());
         }
@@ -421,22 +416,48 @@ public class AppBuildPipeServiceImpl
         if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) {
           List<File> jars = new ArrayList<>(0);
 
-          // 1) user jar
+          // 1). user jar
           jars.add(libJar);
 
           // 2). jar dependency
-          app.getDependencyObject()
-              .getJar()
-              .forEach(jar -> jars.add(new File(localUploadDIR, jar)));
+          app.getMavenDependency().getJar().forEach(jar -> jars.add(new 
File(localUploadDIR, jar)));
 
           // 3). pom dependency
-          if (!app.getDependencyInfo().mavenArts().isEmpty()) {
-            
jars.addAll(MavenTool.resolveArtifactsAsJava(app.getDependencyInfo().mavenArts()));
+          if (!app.getMavenDependency().getPom().isEmpty()) {
+            Set<Artifact> artifacts =
+                app.getMavenDependency().getPom().stream()
+                    .filter(x -> !new File(localUploadDIR, 
x.artifactName()).exists())
+                    .map(
+                        pom ->
+                            new Artifact(
+                                pom.getGroupId(),
+                                pom.getArtifactId(),
+                                pom.getVersion(),
+                                pom.getClassifier(),
+                                pom.toExclusionString()))
+                    .collect(Collectors.toSet());
+            Set<File> mavenArts = MavenTool.resolveArtifactsAsJava(artifacts);
+            jars.addAll(mavenArts);
+          }
+
+          // 4). local uploadDIR to hdfs uploadsDIR
+          String hdfsUploadDIR = Workspace.remote().APP_UPLOADS();
+          for (File jarFile : jars) {
+            String hdfsUploadPath = hdfsUploadDIR + "/" + jarFile.getName();
+            if (!fsOperator.exists(hdfsUploadPath)) {
+              fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
+            } else {
+              InputStream inputStream = Files.newInputStream(jarFile.toPath());
+              if 
(!DigestUtils.md5Hex(inputStream).equals(fsOperator.fileMd5(hdfsUploadPath))) {
+                fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR);
+              }
+            }
           }
 
+          // 5). copy jars to $hdfs_app_home/lib
           fsOperator.mkCleanDirs(app.getAppLib());
-          // 4). upload jars to appLibDIR
-          jars.forEach(jar -> fsOperator.upload(jar.getAbsolutePath(), 
app.getAppLib()));
+          jars.forEach(
+              jar -> fsOperator.copy(hdfsUploadDIR + "/" + jar.getName(), 
app.getAppLib()));
         }
       } else {
         String appHome = app.getAppHome();
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index e894bd626..99da52420 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -40,6 +40,7 @@ import 
org.apache.streampark.console.base.mybatis.pager.MybatisPager;
 import org.apache.streampark.console.base.util.CommonUtils;
 import org.apache.streampark.console.base.util.ObjectUtils;
 import org.apache.streampark.console.base.util.WebUtils;
+import org.apache.streampark.console.core.bean.MavenDependency;
 import org.apache.streampark.console.core.entity.AppBuildPipeline;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.ApplicationConfig;
@@ -852,12 +853,10 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     application.setRelease(ReleaseState.NEED_RELEASE.get());
 
     if (application.isUploadJob()) {
-      Application.Dependency thisDependency =
-          Application.Dependency.toDependency(appParam.getDependency());
-      Application.Dependency targetDependency =
-          Application.Dependency.toDependency(application.getDependency());
+      MavenDependency thisDependency = 
MavenDependency.of(appParam.getDependency());
+      MavenDependency targetDependency = 
MavenDependency.of(application.getDependency());
 
-      if (!thisDependency.eq(targetDependency)) {
+      if (!thisDependency.equals(targetDependency)) {
         application.setDependency(appParam.getDependency());
         application.setBuild(true);
       } else if (!ObjectUtils.safeEquals(application.getJar(), 
appParam.getJar())) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
index b734a7a03..fab29a74e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
@@ -738,9 +738,6 @@ public class FlinkRESTAPIWatcher {
 
   private <T> T yarnRestRequest(String url, Class<T> clazz) throws IOException 
{
     String result = YarnUtils.restRequest(url);
-    if (null == result) {
-      return null;
-    }
     return JacksonUtils.read(result, clazz);
   }
 
diff --git 
a/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue
 
b/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue
index 36ab854c5..35c080269 100644
--- 
a/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue
+++ 
b/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue
@@ -22,7 +22,7 @@
     props: {
       column: {
         type: Object as PropType<BasicColumn>,
-        default: () => ({}) as BasicColumn,
+        default: () => ({} as BasicColumn),
       },
     },
     setup(props) {
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/Dependency.vue
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/Dependency.vue
index bf6aa9543..5c9686a45 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/Dependency.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/Dependency.vue
@@ -33,6 +33,7 @@
   import { fetchUpload } from '/@/api/flink/app/app';
   import { fetchUploadJars } from '/@/api/flink/app/flinkHistory';
   import UploadJobJar from './UploadJobJar.vue';
+  import { fetchFlinkEnv } from '/@/api/flink/setting/flinkEnv';
 
   interface DependencyType {
     artifactId: string;
@@ -89,10 +90,16 @@
       Swal.fire('Failed', t('flink.app.dependencyError'), 'error');
       return;
     }
-    const scalaVersion = props.flinkEnvs.find((v) => v.id === 
versionId)?.scalaVersion;
+
+    let flinkEnv = props.flinkEnvs || [];
+    if (props.flinkEnvs?.length == 0) {
+      flinkEnv = await fetchFlinkEnv();
+    }
+    const scalaVersion = flinkEnv.find((v) => v.id === 
versionId)?.scalaVersion;
     if (props.value == null || props.value.trim() === '') {
       return;
     }
+
     const groupExp = /<groupId>([\s\S]*?)<\/groupId>/;
     const artifactExp = /<artifactId>([\s\S]*?)<\/artifactId>/;
     const versionExp = /<version>([\s\S]*?)<\/version>/;
@@ -125,26 +132,23 @@
               mvnPom.classifier = classifier;
             }
             const id = getId(mvnPom);
-            const pomExclusion = {};
+            const pomExclusion = [];
             if (exclusion != null) {
               const exclusions = exclusion.split('<exclusion>');
               exclusions.forEach((e) => {
                 if (e != null && e.length > 0) {
                   const e_group = e.match(groupExp) ? 
groupExp.exec(e)![1].trim() : null;
                   const e_artifact = e.match(artifactExp) ? 
artifactExp.exec(e)![1].trim() : null;
-                  const id = e_group + '_' + e_artifact;
-                  pomExclusion[id] = {
+                  pomExclusion.push({
                     groupId: e_group,
                     artifactId: e_artifact,
-                  };
+                  });
                 }
               });
             }
             mvnPom.exclusions = pomExclusion;
             dependency.pom[id] = mvnPom;
           }
-        } else {
-          console.error('dependency error...');
         }
       });
 
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
index 752ad30e0..2a1a3b915 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
@@ -597,7 +597,13 @@ export const useCreateAndEditSchema = (
       },
     ];
   });
+
   onMounted(async () => {
+    //get flinkEnv
+    fetchFlinkEnv().then((res) => {
+      flinkEnvs.value = res;
+    });
+
     /* Get project data */
     fetchSelect({}).then((res) => {
       projectList.value = res;
@@ -608,10 +614,6 @@ export const useCreateAndEditSchema = (
       alerts.value = res;
     });
 
-    //get flinkEnv
-    fetchFlinkEnv().then((res) => {
-      flinkEnvs.value = res;
-    });
     //get flinkCluster
     fetchFlinkCluster().then((res) => {
       flinkClusters.value = res;
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/Pom.ts 
b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/Pom.ts
index 87b8ab397..1e0291fa8 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/Pom.ts
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/Pom.ts
@@ -22,65 +22,40 @@ export function toPomString(pom) {
   const classifier = pom.classifier;
   const exclusions = pom.exclusions || [];
   let exclusionString = '';
-  let pomString = '';
+  let classifierString = '';
   if (exclusions.length > 0) {
     exclusions.forEach((item) => {
       exclusionString +=
-        '        <exclusion>\n' +
-        '          <groupId>' +
+        '      <exclusion>\n' +
+        '        <groupId>' +
         item.groupId +
         '</groupId>\n' +
-        '          <artifactId>' +
+        '        <artifactId>' +
         item.artifactId +
         '</artifactId>\n' +
-        '        </exclusion>\n';
+        '      </exclusion>\n';
     });
-    pomString =
-      '  <dependency>\n' +
-      '     <groupId>' +
-      groupId +
-      '</groupId>\n' +
-      '     <artifactId>' +
-      artifactId +
-      '</artifactId>\n' +
-      '     <version>' +
-      version +
-      '</version>\n' +
-      '     <exclusions>\n' +
-      exclusionString +
-      '     </exclusions>\n' +
-      '   </dependency>';
-  } else {
-    if (classifier != null) {
-      pomString =
-        '  <dependency>\n' +
-        '    <groupId>' +
-        groupId +
-        '</groupId>\n' +
-        '    <artifactId>' +
-        artifactId +
-        '</artifactId>\n' +
-        '    <version>' +
-        version +
-        '</version>\n' +
-        '    <classifier>' +
-        classifier +
-        '</classifier>\n' +
-        '  </dependency>';
-    } else {
-      pomString =
-        '  <dependency>\n' +
-        '    <groupId>' +
-        groupId +
-        '</groupId>\n' +
-        '    <artifactId>' +
-        artifactId +
-        '</artifactId>\n' +
-        '    <version>' +
-        version +
-        '</version>\n' +
-        '  </dependency>';
-    }
+    exclusionString = '    <exclusions>\n' + exclusionString + '    
</exclusions>\n';
   }
+
+  if (classifier != null) {
+    classifierString = '    <classifier>' + classifier + '</classifier>\n';
+  }
+
+  const pomString =
+    '  <dependency>\n' +
+    '    <groupId>' +
+    groupId +
+    '</groupId>\n' +
+    '    <artifactId>' +
+    artifactId +
+    '</artifactId>\n' +
+    '    <version>' +
+    version +
+    '</version>\n' +
+    classifierString +
+    exclusionString +
+    '  </dependency>';
+
   return pomString;
 }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 58dc7f82b..6a2e8c95c 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -207,28 +207,22 @@ trait FlinkClientTrait extends Logger {
       jobGraphFunc(submitRequest, flinkConfig, jarFile)
     } match {
       case Failure(e) =>
-        logWarn(
-          s"""\n
-             |[flink-submit] JobGraph Submit Plan failed, error detail:
-             
|------------------------------------------------------------------
-             |${Utils.stringifyException(e)}
-             
|------------------------------------------------------------------
-             |Now retry submit with RestAPI Plan ...
-             |""".stripMargin
-        )
         Try(restApiFunc(submitRequest, flinkConfig, jarFile)) match {
           case Success(r) => r
-          case Failure(e) =>
-            logError(
+          case Failure(e1) =>
+            throw new RuntimeException(
               s"""\n
-                 |[flink-submit] RestAPI Submit failed, error detail:
+                 |[flink-submit] Both JobGraph submit plan and Rest API submit 
plan all failed!
+                 |JobGraph Submit plan failed detail:
                  
|------------------------------------------------------------------
                  |${Utils.stringifyException(e)}
                  
|------------------------------------------------------------------
-                 |Both JobGraph submit plan and Rest API submit plan all 
failed!
-                 |""".stripMargin
-            )
-            throw e
+                 |
+                 | RestAPI Submit plan failed detail:
+                 | 
------------------------------------------------------------------
+                 |${Utils.stringifyException(e1)}
+                 
|------------------------------------------------------------------
+                 |""".stripMargin)
         }
       case Success(v) => v
     }
@@ -239,18 +233,23 @@ trait FlinkClientTrait extends Logger {
       submitRequest: SubmitRequest,
       jarFile: File): (PackagedProgram, JobGraph) = {
 
-    val packageProgram = PackagedProgram.newBuilder
+    val pgkBuilder = PackagedProgram.newBuilder
       .setJarFile(jarFile)
-      .setUserClassPaths(
-        Lists.newArrayList(submitRequest.flinkVersion.flinkLibs: _*)
-      )
       .setEntryPointClassName(
         
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get())
       .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)
-      .setArguments(flinkConfig
-        .getOptional(ApplicationConfiguration.APPLICATION_ARGS)
-        .orElse(Lists.newArrayList()): _*)
-      .build()
+      .setArguments(
+        flinkConfig
+          .getOptional(ApplicationConfiguration.APPLICATION_ARGS)
+          .orElse(Lists.newArrayList()): _*)
+    // userClassPath...
+    submitRequest.executionMode match {
+      case ExecutionMode.REMOTE | ExecutionMode.YARN_PER_JOB =>
+        pgkBuilder.setUserClassPaths(submitRequest.flinkVersion.flinkLibs)
+      case _ =>
+    }
+
+    val packageProgram = pgkBuilder.build()
 
     val jobGraph = PackagedProgramUtils.createJobGraph(
       packageProgram,
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/Artifact.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/Artifact.scala
index cff3c0398..1e8245228 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/Artifact.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/Artifact.scala
@@ -19,15 +19,17 @@ package org.apache.streampark.flink.packer.maven
 
 import org.eclipse.aether.artifact.{Artifact => AetherArtifact}
 
+import java.util.{Collections, Set => JavaSet}
 import java.util.regex.Pattern
 
 case class Artifact(
     groupId: String,
     artifactId: String,
     version: String,
-    classifier: String = null) {
+    classifier: String = null,
+    extensions: JavaSet[String] = Collections.emptySet()) {
 
-  def eq(artifact: AetherArtifact): Boolean = {
+  def filter(artifact: AetherArtifact): Boolean = {
     artifact.getGroupId match {
       case g if g == groupId =>
         artifact.getArtifactId match {
@@ -37,7 +39,6 @@ case class Artifact(
       case _ => false
     }
   }
-
 }
 
 object Artifact {
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/DependencyInfo.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenArtifact.scala
similarity index 79%
rename from 
streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/DependencyInfo.scala
rename to 
streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenArtifact.scala
index c80e416e6..0b39f53d1 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/DependencyInfo.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenArtifact.scala
@@ -27,17 +27,17 @@ import scala.collection.JavaConversions._
  * @param extJarLibs
  *   collection of jar lib paths, which elements can be a directory or file 
path.
  */
-case class DependencyInfo(mavenArts: Set[Artifact] = Set(), extJarLibs: 
Set[String] = Set()) {
+case class MavenArtifact(mavenArts: Set[Artifact] = Set(), extJarLibs: 
Set[String] = Set()) {
 
   def this(mavenArts: JavaList[Artifact], extJarLibs: JavaList[String]) {
     this(mavenArts.toSet, extJarLibs.toSet)
   }
 
-  def merge(jarLibs: Set[String]): DependencyInfo =
-    if (jarLibs != null) DependencyInfo(mavenArts, extJarLibs ++ jarLibs) else 
this.copy()
+  def merge(jarLibs: Set[String]): MavenArtifact =
+    if (jarLibs != null) MavenArtifact(mavenArts, extJarLibs ++ jarLibs) else 
this.copy()
 
 }
 
-object DependencyInfo {
-  def empty: DependencyInfo = new DependencyInfo()
+object MavenArtifact {
+  def empty: MavenArtifact = new MavenArtifact()
 }
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
index 05562a870..a56f41321 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
@@ -43,7 +43,6 @@ import javax.annotation.{Nonnull, Nullable}
 
 import java.io.File
 import java.util
-import java.util.{HashSet, Set => JavaSet}
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
@@ -56,8 +55,10 @@ object MavenTool extends Logger {
 
   private[this] val excludeArtifact = List(
     Artifact.of("org.apache.flink:force-shading:*"),
+    Artifact.of("org.apache.flink:flink-shaded-force-shading:*"),
     Artifact.of("com.google.code.findbugs:jsr305:*"),
-    Artifact.of("org.apache.logging.log4j:*:*"))
+    Artifact.of("org.apache.logging.log4j:*:*")
+  )
 
   private[this] def getRemoteRepos(): List[RemoteRepository] = {
     val builder =
@@ -145,7 +146,7 @@ object MavenTool extends Logger {
       }
       req.setResourceTransformers(transformer.toList)
       // issue: https://github.com/apache/incubator-streampark/issues/2350
-      req.setFilters(List(new ShadeFilter))
+      req.setFilters(List(new ShadedFilter))
       req.setRelocators(Lists.newArrayList())
       req
     }
@@ -159,7 +160,7 @@ object MavenTool extends Logger {
   /**
    * Build a fat-jar with custom jar librarties and maven artifacts.
    *
-   * @param dependencyInfo
+   * @param artifact
    *   maven artifacts and jar libraries for building a fat-jar
    * @param outFatJarPath
    *   output paths of fat-jar, like "/streampark/workspace/233/my-fat.jar"
@@ -167,10 +168,10 @@ object MavenTool extends Logger {
   @throws[Exception]
   def buildFatJar(
       @Nullable mainClass: String,
-      @Nonnull dependencyInfo: DependencyInfo,
+      @Nonnull artifact: MavenArtifact,
       @Nonnull outFatJarPath: String): File = {
-    val jarLibs = dependencyInfo.extJarLibs
-    val arts = dependencyInfo.mavenArts
+    val jarLibs = artifact.extJarLibs
+    val arts = artifact.mavenArts
     if (jarLibs.isEmpty && arts.isEmpty) {
       throw new Exception(s"[StreamPark] streampark-packer: empty artifacts.")
     }
@@ -178,8 +179,8 @@ object MavenTool extends Logger {
     buildFatJar(mainClass, jarLibs ++ artFilePaths, outFatJarPath)
   }
 
-  def resolveArtifactsAsJava(mavenArtifacts: Set[Artifact]): JavaSet[File] = 
resolveArtifacts(
-    mavenArtifacts).asJava
+  def resolveArtifactsAsJava(mavenArtifacts: util.Set[Artifact]): 
util.Set[File] = resolveArtifacts(
+    mavenArtifacts.toSet).asJava
 
   /**
    * Resolve the collectoin of artifacts, Artifacts will be download to 
ConfigConst.MAVEN_LOCAL_DIR
@@ -192,37 +193,55 @@ object MavenTool extends Logger {
    */
   @throws[Exception]
   def resolveArtifacts(mavenArtifacts: Set[Artifact]): Set[File] = {
-    if (mavenArtifacts == null) Set.empty[File];
-    else {
-      val (repoSystem, session) = getMavenEndpoint()
-      val artifacts = mavenArtifacts.map(
-        e => {
-          new DefaultArtifact(e.groupId, e.artifactId, e.classifier, "jar", 
e.version)
-        })
-      logInfo(s"start resolving dependencies: ${artifacts.mkString}")
-
-      val remoteRepos = getRemoteRepos()
-      // read relevant artifact descriptor info
-      // plz don't simplify the following lambda syntax to maintain the 
readability of the code.
-      val resolvedArtifacts = artifacts
-        .map(artifact => new ArtifactDescriptorRequest(artifact, remoteRepos, 
null))
-        .map(artDescReq => repoSystem.readArtifactDescriptor(session, 
artDescReq))
-        .flatMap(_.getDependencies)
-        .filter(_.getScope == "compile")
-        .filter(x => !excludeArtifact.exists(_.eq(x.getArtifact)))
-        .map(_.getArtifact)
-
-      val mergedArtifacts = artifacts ++ resolvedArtifacts
-      logInfo(s"resolved dependencies: ${mergedArtifacts.mkString}")
-
-      // download artifacts
-      val artReqs =
-        mergedArtifacts.map(artifact => new ArtifactRequest(artifact, 
remoteRepos, null))
-      repoSystem
-        .resolveArtifacts(session, artReqs)
-        .map(_.getArtifact.getFile)
-        .toSet
+    if (mavenArtifacts == null) {
+      return Set.empty[File]
     }
+
+    val (repoSystem, session) = getMavenEndpoint()
+    val artifacts = mavenArtifacts.map(
+      e => {
+        new DefaultArtifact(e.groupId, e.artifactId, e.classifier, "jar", 
e.version) -> e.extensions
+      })
+
+    logInfo(s"start resolving dependencies: ${artifacts.mkString}")
+
+    val remoteRepos = getRemoteRepos()
+    // read relevant artifact descriptor info
+    // plz don't simplify the following lambda syntax to maintain the 
readability of the code.
+    val dependenciesArtifacts = artifacts
+      .map(artifact => new ArtifactDescriptorRequest(artifact._1, remoteRepos, 
null) -> artifact._2)
+      .map(descReq => repoSystem.readArtifactDescriptor(session, descReq._1) 
-> descReq._2)
+      .flatMap(
+        result =>
+          result._1.getDependencies
+            .filter(
+              dep => {
+                dep.getScope match {
+                  case "compile" if 
!excludeArtifact.exists(_.filter(dep.getArtifact)) =>
+                    val ga = 
s"${dep.getArtifact.getGroupId}:${dep.getArtifact.getArtifactId}"
+                    val exclusion = result._2.contains(ga)
+                    if (exclusion) {
+                      val art = result._1.getArtifact
+                      val name = s"${art.getGroupId}:${art.getArtifactId}"
+                      logInfo(s"[MavenTool] $name dependencies exclusion $ga")
+                    }
+                    !exclusion
+                  case _ => false
+                }
+              })
+            .map(_.getArtifact))
+
+    val mergedArtifacts = artifacts.map(_._1) ++ dependenciesArtifacts
+
+    logInfo(s"resolved dependencies: ${mergedArtifacts.mkString}")
+
+    // download artifacts
+    val artReqs =
+      mergedArtifacts.map(artifact => new ArtifactRequest(artifact, 
remoteRepos, null))
+    repoSystem
+      .resolveArtifacts(session, artReqs)
+      .map(_.getArtifact.getFile)
+      .toSet
   }
 
   /** create composite maven endpoint */
@@ -256,13 +275,13 @@ object MavenTool extends Logger {
     (repoSystem, session)
   }
 
-  class ShadeFilter extends Filter {
+  private class ShadedFilter extends Filter {
     override def canFilter(jar: File): Boolean = true
 
     override def isFiltered(name: String): Boolean = {
       if (name.startsWith("META-INF/")) {
         if (name.endsWith(".SF") || name.endsWith(".DSA") || 
name.endsWith(".RSA")) {
-          logInfo(s"shade ignore file: $name")
+          logInfo(s"shaded ignore file: $name")
           return true
         }
       }
@@ -271,5 +290,4 @@ object MavenTool extends Logger {
 
     override def finished(): Unit = {}
   }
-
 }
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
index 5abc55b92..41c56aa6b 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
@@ -21,7 +21,7 @@ import org.apache.streampark.common.conf.{FlinkVersion, 
Workspace}
 import org.apache.streampark.common.enums.{DevelopmentMode, ExecutionMode}
 import org.apache.streampark.flink.kubernetes.model.K8sPodTemplates
 import org.apache.streampark.flink.packer.docker.DockerConf
-import org.apache.streampark.flink.packer.maven.DependencyInfo
+import org.apache.streampark.flink.packer.maven.MavenArtifact
 
 import scala.collection.mutable.ArrayBuffer
 
@@ -44,17 +44,17 @@ sealed trait FlinkBuildParam extends BuildParam {
 
   def flinkVersion: FlinkVersion
 
-  def dependencyInfo: DependencyInfo
+  def dependency: MavenArtifact
 
   def customFlinkUserJar: String
 
-  lazy val providedLibs: DependencyInfo = {
+  lazy val providedLibs: MavenArtifact = {
     val providedLibs =
       ArrayBuffer(localWorkspace.APP_JARS, localWorkspace.APP_PLUGINS, 
customFlinkUserJar)
     if (developmentMode == DevelopmentMode.FLINK_SQL) {
       providedLibs += 
s"${localWorkspace.APP_SHIMS}/flink-${flinkVersion.majorVersion}"
     }
-    dependencyInfo.merge(providedLibs.toSet)
+    dependency.merge(providedLibs.toSet)
   }
 
   def getShadedJarPath(rootWorkspace: String): String = {
@@ -79,7 +79,7 @@ case class FlinkK8sSessionBuildRequest(
     executionMode: ExecutionMode,
     developmentMode: DevelopmentMode,
     flinkVersion: FlinkVersion,
-    dependencyInfo: DependencyInfo,
+    dependency: MavenArtifact,
     clusterId: String,
     k8sNamespace: String)
   extends FlinkK8sBuildParam
@@ -92,7 +92,7 @@ case class FlinkK8sApplicationBuildRequest(
     executionMode: ExecutionMode,
     developmentMode: DevelopmentMode,
     flinkVersion: FlinkVersion,
-    dependencyInfo: DependencyInfo,
+    dependency: MavenArtifact,
     clusterId: String,
     k8sNamespace: String,
     flinkBaseImage: String,
@@ -110,7 +110,7 @@ case class FlinkRemotePerJobBuildRequest(
     executionMode: ExecutionMode,
     developmentMode: DevelopmentMode,
     flinkVersion: FlinkVersion,
-    dependencyInfo: DependencyInfo)
+    dependency: MavenArtifact)
   extends FlinkBuildParam
 
 case class FlinkYarnApplicationBuildRequest(
@@ -118,5 +118,5 @@ case class FlinkYarnApplicationBuildRequest(
     mainClass: String,
     yarnProvidedPath: String,
     developmentMode: DevelopmentMode,
-    dependencyInfo: DependencyInfo)
+    dependencyInfo: MavenArtifact)
   extends BuildParam
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
index 72d47991e..060209ff9 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
@@ -91,7 +91,7 @@ class FlinkK8sApplicationBuildPipeline(request: 
FlinkK8sApplicationBuildRequest)
       execStep(3) {
         val shadedJarOutputPath = request.getShadedJarPath(buildWorkspace)
         val extJarLibs = request.developmentMode match {
-          case DevelopmentMode.FLINK_SQL => request.dependencyInfo.extJarLibs
+          case DevelopmentMode.FLINK_SQL => request.dependency.extJarLibs
           case DevelopmentMode.CUSTOM_CODE => Set[String]()
         }
         val shadedJar =
diff --git 
a/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala
 
b/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala
index 7b0803c45..06831510c 100644
--- 
a/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.streampark.flink.packer
 
-import org.apache.streampark.flink.packer.maven.{Artifact, DependencyInfo, 
MavenTool}
+import org.apache.streampark.flink.packer.maven.{Artifact, MavenArtifact, 
MavenTool}
 
 import org.apache.commons.io.FileUtils
 import org.scalatest.BeforeAndAfterAll
@@ -87,7 +87,7 @@ class MavenToolSpec extends AnyWordSpec with 
BeforeAndAfterAll with Matchers {
         val fatJarPath = outputDir.concat("fat-3.jar")
         val fatJar = MavenTool.buildFatJar(
           null,
-          DependencyInfo(
+          MavenArtifact(
             
Set(Artifact.of("org.apache.flink:flink-connector-kafka_2.11:1.13.0")),
             Set(path("jars/commons-dbutils-1.7.jar"))),
           fatJarPath)

Reply via email to