This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
     new f23d515ba [Bug] deploy flink job on yarnApplication Mode bug fixed 
(#3325)
f23d515ba is described below

commit f23d515ba7cd23fbec303a2dd7823d634aaac6bf
Author: benjobs <[email protected]>
AuthorDate: Fri Nov 10 01:21:19 2023 +0800

    [Bug] deploy flink job on yarnApplication Mode bug fixed (#3325)
    
    * [Bug] yarn application bug fixed
    
    * [Bug] upload job to workspace bug fixed
    
    * [Improve] default  hdfs workspace path improvement
    
    * [Improve] flink yarn-app mode improvement
    
    * [Improve] upload jar minor improvement
    
    * [Improve] release app error info improvement
    
    ---------
    
    Co-authored-by: benjobs <[email protected]>
---
 .../conf/streampark-console-config/application.yml |   2 +-
 .../ApplicationBuildPipelineController.java        |  93 ++++++------
 .../console/core/entity/Application.java           |   5 +
 .../core/service/impl/AppBuildPipeServiceImpl.java | 160 +++++++++++++--------
 .../core/service/impl/ApplicationServiceImpl.java  |  19 ++-
 .../src/main/resources/application.yml             |   2 +-
 .../flink/client/impl/YarnApplicationClient.scala  |  15 +-
 .../streampark/flink/packer/maven/MavenTool.scala  |   5 +
 .../flink/packer/pipeline/BuildRequest.scala       |   1 -
 .../impl/FlinkYarnApplicationBuildPipeline.scala   |  46 +++---
 10 files changed, 183 insertions(+), 165 deletions(-)

diff --git 
a/deploy/helm/streampark/conf/streampark-console-config/application.yml 
b/deploy/helm/streampark/conf/streampark-console-config/application.yml
index 309e7e6af..2cee6fa8a 100755
--- a/deploy/helm/streampark/conf/streampark-console-config/application.yml
+++ b/deploy/helm/streampark/conf/streampark-console-config/application.yml
@@ -97,7 +97,7 @@ streampark:
   # local workspace, used to store source code and build dir etc.
   workspace:
     local: /opt/streampark_workspace
-    remote: hdfs://hdfscluster/streampark   # support hdfs:///streampark/ 、 
/streampark 、hdfs://host:ip/streampark/
+    remote: hdfs:///streampark   # support hdfs:///streampark/ 、 /streampark 
、hdfs://host:ip/streampark/
 
   # remote docker register namespace for streampark
   docker:
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
index 52aecfcb5..65352306f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
@@ -95,56 +95,51 @@ public class ApplicationBuildPipelineController {
   @PermissionAction(id = "#appId", type = PermissionType.APP)
   @PostMapping(value = "build")
   @RequiresPermissions("app:create")
-  public RestResponse buildApplication(Long appId, boolean forceBuild) {
-    try {
-      Application app = applicationService.getById(appId);
-
-      // 1) check flink version
-      FlinkEnv env = flinkEnvService.getById(app.getVersionId());
-      boolean checkVersion = env.getFlinkVersion().checkVersion(false);
-      if (!checkVersion) {
-        throw new ApiAlertException(
-            "Unsupported flink version: " + env.getFlinkVersion().version());
-      }
-
-      // 2) check env
-      boolean envOk = applicationService.checkEnv(app);
-      if (!envOk) {
-        throw new ApiAlertException(
-            "Check flink env failed, please check the flink version of this 
job");
-      }
-
-      if (!forceBuild && !appBuildPipeService.allowToBuildNow(appId)) {
-        throw new ApiAlertException(
-            "The job is invalid, or the job cannot be built while it is 
running");
-      }
-      // check if you need to go through the build process (if the jar and pom 
have changed,
-      // you need to go through the build process, if other common parameters 
are modified,
-      // you don't need to go through the build process)
-
-      ApplicationLog applicationLog = new ApplicationLog();
-      applicationLog.setOptionName(
-          
org.apache.streampark.console.core.enums.Operation.RELEASE.getValue());
-      applicationLog.setAppId(app.getId());
-      applicationLog.setOptionTime(new Date());
-
-      boolean needBuild = applicationService.checkBuildAndUpdate(app);
-      if (!needBuild) {
-        applicationLog.setSuccess(true);
-        applicationLogService.save(applicationLog);
-        return RestResponse.success(true);
-      }
-
-      // rollback
-      if (app.isNeedRollback() && app.isFlinkSqlJob()) {
-        flinkSqlService.rollback(app);
-      }
-
-      boolean actionResult = appBuildPipeService.buildApplication(app, 
applicationLog);
-      return RestResponse.success(actionResult);
-    } catch (Exception e) {
-      return RestResponse.success(false).message(e.getMessage());
+  public RestResponse buildApplication(Long appId, boolean forceBuild) throws 
Exception {
+    Application app = applicationService.getById(appId);
+
+    // 1) check flink version
+    FlinkEnv env = flinkEnvService.getById(app.getVersionId());
+    boolean checkVersion = env.getFlinkVersion().checkVersion(false);
+    if (!checkVersion) {
+      throw new ApiAlertException("Unsupported flink version: " + 
env.getFlinkVersion().version());
     }
+
+    // 2) check env
+    boolean envOk = applicationService.checkEnv(app);
+    if (!envOk) {
+      throw new ApiAlertException(
+          "Check flink env failed, please check the flink version of this 
job");
+    }
+
+    if (!forceBuild && !appBuildPipeService.allowToBuildNow(appId)) {
+      throw new ApiAlertException(
+          "The job is invalid, or the job cannot be built while it is 
running");
+    }
+    // check if you need to go through the build process (if the jar and pom 
have changed,
+    // you need to go through the build process, if other common parameters 
are modified,
+    // you don't need to go through the build process)
+
+    ApplicationLog applicationLog = new ApplicationLog();
+    applicationLog.setOptionName(
+        org.apache.streampark.console.core.enums.Operation.RELEASE.getValue());
+    applicationLog.setAppId(app.getId());
+    applicationLog.setOptionTime(new Date());
+
+    boolean needBuild = applicationService.checkBuildAndUpdate(app);
+    if (!needBuild) {
+      applicationLog.setSuccess(true);
+      applicationLogService.save(applicationLog);
+      return RestResponse.success(true);
+    }
+
+    // rollback
+    if (app.isNeedRollback() && app.isFlinkSqlJob()) {
+      flinkSqlService.rollback(app);
+    }
+
+    boolean actionResult = appBuildPipeService.buildApplication(app, 
applicationLog);
+    return RestResponse.success(actionResult);
   }
 
   /**
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 568f7f2a5..5b3aa4db2 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -439,6 +439,11 @@ public class Application implements Serializable {
     return getAppHome().concat("/lib");
   }
 
+  @JsonIgnore
+  public String getLocalAppLib() {
+    return getLocalAppHome().concat("/lib");
+  }
+
   @JsonIgnore
   public ApplicationType getApplicationType() {
     return ApplicationType.of(appType);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index f6fb7e641..07c66ff05 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -19,8 +19,6 @@ package org.apache.streampark.console.core.service.impl;
 
 import org.apache.streampark.common.conf.ConfigConst;
 import org.apache.streampark.common.conf.Workspace;
-import org.apache.streampark.common.enums.ApplicationType;
-import org.apache.streampark.common.enums.DevelopmentMode;
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.fs.FsOperator;
 import org.apache.streampark.common.util.FileUtils;
@@ -52,6 +50,7 @@ import 
org.apache.streampark.console.core.service.MessageService;
 import org.apache.streampark.console.core.service.SettingService;
 import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
 import org.apache.streampark.flink.packer.docker.DockerConf;
+import org.apache.streampark.flink.packer.maven.MavenTool;
 import org.apache.streampark.flink.packer.pipeline.BuildPipeline;
 import org.apache.streampark.flink.packer.pipeline.BuildResult;
 import org.apache.streampark.flink.packer.pipeline.DockerBuildSnapshot;
@@ -87,6 +86,7 @@ import 
org.springframework.transaction.annotation.Transactional;
 import javax.annotation.Nonnull;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -180,44 +180,7 @@ public class AppBuildPipeServiceImpl
             applicationService.checkEnv(app);
 
             // 2) some preparatory work
-            String appUploads = app.getWorkspace().APP_UPLOADS();
-
-            if (app.isCustomCodeJob()) {
-              // customCode upload jar to appHome...
-              FsOperator fsOperator = app.getFsOperator();
-              if (app.isCICDJob()) {
-                String appHome = app.getAppHome();
-                fsOperator.mkCleanDirs(appHome);
-                fsOperator.upload(app.getDistHome(), appHome);
-              } else {
-                File localJar = new File(WebUtils.getAppTempDir(), 
app.getJar());
-                // upload jar copy to appHome
-                String uploadJar = appUploads.concat("/").concat(app.getJar());
-                checkOrElseUploadJar(app.getFsOperator(), localJar, uploadJar, 
appUploads);
-                if (app.getApplicationType() == 
ApplicationType.STREAMPARK_FLINK) {
-                  fsOperator.mkdirs(app.getAppLib());
-                  fsOperator.copy(uploadJar, app.getAppLib(), false, true);
-                }
-              }
-            }
-
-            if (app.isFlinkSqlJob() || app.isUploadJob()) {
-              if (!app.getDependencyObject().getJar().isEmpty()) {
-                String localUploads = Workspace.local().APP_UPLOADS();
-                // copy jar to local upload dir
-                for (String jar : app.getDependencyObject().getJar()) {
-                  File localJar = new File(WebUtils.getAppTempDir(), jar);
-                  File uploadJar = new File(localUploads, jar);
-                  if (!localJar.exists() && !uploadJar.exists()) {
-                    throw new ApiAlertException("Missing file: " + jar + ", 
please upload again");
-                  }
-                  if (localJar.exists()) {
-                    checkOrElseUploadJar(
-                        FsOperator.lfs(), localJar, 
uploadJar.getAbsolutePath(), localUploads);
-                  }
-                }
-              }
-            }
+            prepareJars(app);
           }
 
           @Override
@@ -325,24 +288,17 @@ public class AppBuildPipeServiceImpl
   /** create building pipeline instance */
   private BuildPipeline createPipelineInstance(@Nonnull Application app) {
     FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(app.getVersionId());
-    String flinkUserJar = retrieveFlinkUserJar(flinkEnv, app);
+    String userLocalJar = retrieveUserLocalJar(flinkEnv, app);
     ExecutionMode executionMode = app.getExecutionModeEnum();
     String mainClass =
         app.isCustomCodeJob() ? app.getMainClass() : 
ConfigConst.STREAMPARK_FLINKSQL_CLIENT_CLASS();
     switch (executionMode) {
       case YARN_APPLICATION:
         String yarnProvidedPath = app.getAppLib();
-        String localWorkspace = app.getLocalAppHome().concat("/lib");
-        if (app.getDevelopmentMode().equals(DevelopmentMode.CUSTOM_CODE)
-            && app.getApplicationType().equals(ApplicationType.APACHE_FLINK)) {
-          yarnProvidedPath = app.getAppHome();
-          localWorkspace = app.getLocalAppHome();
-        }
         FlinkYarnApplicationBuildRequest yarnAppRequest =
             new FlinkYarnApplicationBuildRequest(
                 app.getJobName(),
                 mainClass,
-                localWorkspace,
                 yarnProvidedPath,
                 app.getDevelopmentMode(),
                 app.getDependencyInfo());
@@ -356,7 +312,7 @@ public class AppBuildPipeServiceImpl
                 app.getJobName(),
                 app.getLocalAppHome(),
                 mainClass,
-                flinkUserJar,
+                userLocalJar,
                 app.getExecutionModeEnum(),
                 app.getDevelopmentMode(),
                 flinkEnv.getFlinkVersion(),
@@ -369,7 +325,7 @@ public class AppBuildPipeServiceImpl
                 app.getJobName(),
                 app.getLocalAppHome(),
                 mainClass,
-                flinkUserJar,
+                userLocalJar,
                 app.getExecutionModeEnum(),
                 app.getDevelopmentMode(),
                 flinkEnv.getFlinkVersion(),
@@ -384,7 +340,7 @@ public class AppBuildPipeServiceImpl
                 app.getJobName(),
                 app.getLocalAppHome(),
                 mainClass,
-                flinkUserJar,
+                userLocalJar,
                 app.getExecutionModeEnum(),
                 app.getDevelopmentMode(),
                 flinkEnv.getFlinkVersion(),
@@ -408,15 +364,97 @@ public class AppBuildPipeServiceImpl
     }
   }
 
+  private void prepareJars(Application app) {
+    File localUploadDIR = new File(Workspace.local().APP_UPLOADS());
+    if (!localUploadDIR.exists()) {
+      localUploadDIR.mkdirs();
+    }
+
+    FsOperator localFS = FsOperator.lfs();
+    // 1. copy jar to local upload dir
+    if (app.isFlinkSqlJob() || app.isUploadJob()) {
+      if (!app.getDependencyObject().getJar().isEmpty()) {
+        for (String jar : app.getDependencyObject().getJar()) {
+          File localJar = new File(WebUtils.getAppTempDir(), jar);
+          File localUploadJar = new File(localUploadDIR, jar);
+          if (!localJar.exists() && !localUploadJar.exists()) {
+            throw new ApiAlertException("Missing file: " + jar + ", please 
upload again");
+          }
+          if (localJar.exists()) {
+            checkOrElseUploadJar(localFS, localJar, localUploadJar, 
localUploadDIR);
+          }
+        }
+      }
+    }
+
+    if (app.isCustomCodeJob()) {
+      // customCode upload jar to appHome...
+      FsOperator fsOperator = app.getFsOperator();
+
+      if (app.isUploadJob()) {
+        // 1). upload jar to local uploadDIR.
+        File localJar = new File(WebUtils.getAppTempDir(), app.getJar());
+        File localUploadJar = new File(localUploadDIR, app.getJar());
+        checkOrElseUploadJar(localFS, localJar, localUploadJar, 
localUploadDIR);
+
+        // 2) copy jar to local $app_home/lib
+        boolean cleanUpload = false;
+        File libJar = new File(app.getLocalAppLib(), app.getJar());
+        if (!localFS.exists(app.getLocalAppLib())) {
+          cleanUpload = true;
+        } else {
+          if (libJar.exists()) {
+            if (!FileUtils.equals(localJar, libJar)) {
+              cleanUpload = true;
+            }
+          } else {
+            cleanUpload = true;
+          }
+        }
+
+        if (cleanUpload) {
+          localFS.mkCleanDirs(app.getLocalAppLib());
+          localFS.upload(localUploadJar.getAbsolutePath(), 
app.getLocalAppLib());
+        }
+
+        // 3) for YARNApplication mode
+        if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) {
+          List<File> jars = new ArrayList<>(0);
+
+          // 1) user jar
+          jars.add(libJar);
+
+          // 2). jar dependency
+          app.getDependencyObject()
+              .getJar()
+              .forEach(jar -> jars.add(new File(localUploadDIR, jar)));
+
+          // 3). pom dependency
+          if (!app.getDependencyInfo().mavenArts().isEmpty()) {
+            
jars.addAll(MavenTool.resolveArtifactsAsJava(app.getDependencyInfo().mavenArts()));
+          }
+
+          fsOperator.mkCleanDirs(app.getAppLib());
+          // 4). upload jars to appLibDIR
+          jars.forEach(jar -> fsOperator.upload(jar.getAbsolutePath(), 
app.getAppLib()));
+        }
+      } else {
+        String appHome = app.getAppHome();
+        fsOperator.mkCleanDirs(appHome);
+        fsOperator.upload(app.getDistHome(), appHome);
+      }
+    }
+  }
+
   /** copy from {@link ApplicationServiceImpl#start(Application, boolean)} */
-  private String retrieveFlinkUserJar(FlinkEnv flinkEnv, Application app) {
+  private String retrieveUserLocalJar(FlinkEnv flinkEnv, Application app) {
     switch (app.getDevelopmentMode()) {
       case CUSTOM_CODE:
         switch (app.getApplicationType()) {
           case STREAMPARK_FLINK:
-            return String.format("%s/%s", app.getAppLib(), 
app.getModule().concat(".jar"));
+            return String.format("%s/%s", app.getLocalAppLib(), 
app.getModule().concat(".jar"));
           case APACHE_FLINK:
-            return String.format("%s/%s", WebUtils.getAppTempDir(), 
app.getJar());
+            return String.format("%s/%s", app.getLocalAppLib(), app.getJar());
           default:
             throw new IllegalArgumentException(
                 "[StreamPark] unsupported ApplicationType of custom code: "
@@ -424,10 +462,6 @@ public class AppBuildPipeServiceImpl
         }
       case FLINK_SQL:
         String sqlDistJar = commonService.getSqlClientJar(flinkEnv);
-        if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) {
-          String clientPath = Workspace.remote().APP_CLIENT();
-          return String.format("%s/%s", clientPath, sqlDistJar);
-        }
         return Workspace.local().APP_CLIENT().concat("/").concat(sqlDistJar);
       default:
         throw new UnsupportedOperationException(
@@ -487,13 +521,13 @@ public class AppBuildPipeServiceImpl
   }
 
   private void checkOrElseUploadJar(
-      FsOperator fsOperator, File localJar, String targetJar, String 
targetDir) {
-    if (!fsOperator.exists(targetJar)) {
-      fsOperator.upload(localJar.getAbsolutePath(), targetDir, false, true);
+      FsOperator fsOperator, File localJar, File targetJar, File targetDir) {
+    if (!fsOperator.exists(targetJar.getAbsolutePath())) {
+      fsOperator.upload(localJar.getAbsolutePath(), 
targetDir.getAbsolutePath());
     } else {
       // The file exists to check whether it is consistent, and if it is 
inconsistent, re-upload it
-      if (!FileUtils.equals(localJar, new File(targetJar))) {
-        fsOperator.upload(localJar.getAbsolutePath(), targetDir, false, true);
+      if (!FileUtils.equals(localJar, targetJar)) {
+        fsOperator.upload(localJar.getAbsolutePath(), 
targetDir.getAbsolutePath());
       }
     }
   }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index e23c12564..e894bd626 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -704,16 +704,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     ApiAlertException.throwIfNull(
         appParam.getTeamId(), "The teamId can't be null. Create application 
failed.");
 
-    if (appParam.isFlinkSqlJob()) {
-      appParam.setBuild(true);
-    } else {
-      if (appParam.isUploadJob()) {
-        appParam.setBuild(!appParam.getDependencyObject().isEmpty());
-      } else {
-        appParam.setBuild(false);
-      }
-    }
-
+    appParam.setBuild(true);
     appParam.setUserId(commonService.getUserId());
     appParam.setState(FlinkAppState.ADDED.getValue());
     appParam.setRelease(ReleaseState.NEED_RELEASE.get());
@@ -819,6 +810,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     newApp.setJarCheckSum(oldApp.getJarCheckSum());
     newApp.setTags(oldApp.getTags());
     newApp.setTeamId(oldApp.getTeamId());
+    newApp.setDependency(oldApp.getDependency());
 
     boolean saved = save(newApp);
     if (saved) {
@@ -1499,7 +1491,12 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
                     "%s/%s", application.getAppLib(), 
application.getModule().concat(".jar"));
             break;
           case APACHE_FLINK:
-            flinkUserJar = String.format("%s/%s", application.getAppHome(), 
application.getJar());
+            if (application.getFsOperator().exists(application.getAppLib())) {
+              flinkUserJar = String.format("%s/%s", application.getAppLib(), 
application.getJar());
+            } else {
+              // compatible with historical version
+              flinkUserJar = String.format("%s/%s", application.getAppHome(), 
application.getJar());
+            }
             break;
           default:
             throw new IllegalArgumentException(
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/application.yml
 
b/streampark-console/streampark-console-service/src/main/resources/application.yml
index db59bf85e..52893833f 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/application.yml
+++ 
b/streampark-console/streampark-console-service/src/main/resources/application.yml
@@ -97,7 +97,7 @@ streampark:
   # local workspace, used to store source code and build dir etc.
   workspace:
     local: /opt/streampark_workspace
-    remote: hdfs://hdfscluster/streampark   # support hdfs:///streampark/ 、 
/streampark 、hdfs://host:ip/streampark/
+    remote: hdfs:///streampark   # support hdfs:///streampark/ 、 /streampark 
、hdfs://host:ip/streampark/
 
   # remote docker register namespace for streampark
   docker:
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index 2b981b305..26af8f8ab 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -62,18 +62,15 @@ object YarnApplicationClient extends YarnClientTrait {
     val providedLibs = {
       val array = ListBuffer(
         submitRequest.hdfsWorkspace.flinkLib,
-        submitRequest.hdfsWorkspace.flinkPlugins,
         submitRequest.hdfsWorkspace.appJars,
         submitRequest.hdfsWorkspace.appPlugins
       )
-      submitRequest.developmentMode match {
-        case DevelopmentMode.FLINK_SQL =>
-          array += 
s"${workspace.APP_SHIMS}/flink-${submitRequest.flinkVersion.majorVersion}"
-          val jobLib = s"${workspace.APP_WORKSPACE}/${submitRequest.id}/lib"
-          if (HdfsUtils.exists(jobLib)) {
-            array += jobLib
-          }
-        case _ =>
+      val jobLib = s"${workspace.APP_WORKSPACE}/${submitRequest.id}/lib"
+      if (HdfsUtils.exists(jobLib)) {
+        array += jobLib
+      }
+      if (submitRequest.developmentMode == DevelopmentMode.FLINK_SQL) {
+        array += 
s"${workspace.APP_SHIMS}/flink-${submitRequest.flinkVersion.majorVersion}"
       }
       array.toList
     }
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
index 5dab91892..05562a870 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
@@ -43,8 +43,10 @@ import javax.annotation.{Nonnull, Nullable}
 
 import java.io.File
 import java.util
+import java.util.{HashSet, Set => JavaSet}
 
 import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.util.{Failure, Success, Try}
 
@@ -176,6 +178,9 @@ object MavenTool extends Logger {
     buildFatJar(mainClass, jarLibs ++ artFilePaths, outFatJarPath)
   }
 
+  def resolveArtifactsAsJava(mavenArtifacts: Set[Artifact]): JavaSet[File] = 
resolveArtifacts(
+    mavenArtifacts).asJava
+
   /**
    * Resolve the collectoin of artifacts, Artifacts will be download to 
ConfigConst.MAVEN_LOCAL_DIR
    * if necessary. notes: Only compile scope dependencies will be resolved.
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
index 62f90935a..5abc55b92 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
@@ -116,7 +116,6 @@ case class FlinkRemotePerJobBuildRequest(
 case class FlinkYarnApplicationBuildRequest(
     appName: String,
     mainClass: String,
-    localWorkspace: String,
     yarnProvidedPath: String,
     developmentMode: DevelopmentMode,
     dependencyInfo: DependencyInfo)
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
index 92c1d9199..dcf6ad86f 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
@@ -19,7 +19,7 @@ package org.apache.streampark.flink.packer.pipeline.impl
 
 import org.apache.streampark.common.conf.Workspace
 import org.apache.streampark.common.enums.DevelopmentMode
-import org.apache.streampark.common.fs.{FsOperator, HdfsOperator, LfsOperator}
+import org.apache.streampark.common.fs.{FsOperator, HdfsOperator}
 import org.apache.streampark.common.util.Utils
 import org.apache.streampark.flink.packer.maven.MavenTool
 import org.apache.streampark.flink.packer.pipeline._
@@ -45,9 +45,7 @@ class FlinkYarnApplicationBuildPipeline(request: 
FlinkYarnApplicationBuildReques
   override protected def buildProcess(): SimpleBuildResponse = {
     execStep(1) {
       request.developmentMode match {
-        case DevelopmentMode.FLINK_SQL =>
-          LfsOperator.mkCleanDirs(request.localWorkspace)
-          HdfsOperator.mkCleanDirs(request.yarnProvidedPath)
+        case DevelopmentMode.FLINK_SQL => 
HdfsOperator.mkCleanDirs(request.yarnProvidedPath)
         case _ =>
       }
       logInfo(s"recreate building workspace: ${request.yarnProvidedPath}")
@@ -64,11 +62,7 @@ class FlinkYarnApplicationBuildPipeline(request: 
FlinkYarnApplicationBuildReques
       }.getOrElse(throw getError.exception)
 
     execStep(3) {
-      mavenJars.foreach(
-        jar => {
-          uploadToHdfs(FsOperator.lfs, jar, request.localWorkspace)
-          uploadToHdfs(FsOperator.hdfs, jar, request.yarnProvidedPath)
-        })
+      mavenJars.foreach(jar => uploadToHdfs(FsOperator.hdfs, jar, 
request.yarnProvidedPath))
     }.getOrElse(throw getError.exception)
 
     SimpleBuildResponse()
@@ -82,29 +76,21 @@ class FlinkYarnApplicationBuildPipeline(request: 
FlinkYarnApplicationBuildReques
     }
     if (originFile.isFile) {
       // check file in upload dir
-      fsOperator match {
-        case FsOperator.lfs =>
-          fsOperator.copy(originFile.getAbsolutePath, target)
-        case FsOperator.hdfs =>
-          val uploadFile = 
s"${Workspace.remote.APP_UPLOADS}/${originFile.getName}"
-          if (fsOperator.exists(uploadFile)) {
-            Utils.using(new FileInputStream(originFile))(
-              inputStream => {
-                if (DigestUtils.md5Hex(inputStream) != 
fsOperator.fileMd5(uploadFile)) {
-                  fsOperator.upload(originFile.getAbsolutePath, uploadFile)
-                }
-              })
-          } else {
-            fsOperator.upload(originFile.getAbsolutePath, uploadFile)
-          }
-          // copy jar from upload dir to target dir
-          fsOperator.copy(uploadFile, target)
+      val uploadFile = s"${Workspace.remote.APP_UPLOADS}/${originFile.getName}"
+      if (fsOperator.exists(uploadFile)) {
+        Utils.using(new FileInputStream(originFile))(
+          inputStream => {
+            if (DigestUtils.md5Hex(inputStream) != 
fsOperator.fileMd5(uploadFile)) {
+              fsOperator.upload(originFile.getAbsolutePath, uploadFile)
+            }
+          })
+      } else {
+        fsOperator.upload(originFile.getAbsolutePath, uploadFile)
       }
+      // copy jar from upload dir to target dir
+      fsOperator.copy(uploadFile, target)
     } else {
-      fsOperator match {
-        case FsOperator.hdfs => fsOperator.upload(originFile.getAbsolutePath, 
target)
-        case _ =>
-      }
+      fsOperator.upload(originFile.getAbsolutePath, target)
     }
   }
 

Reply via email to