This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new d1a31558e jar upload bug fix (#2948)
d1a31558e is described below

commit d1a31558eafede6baf20de18ebcf6f1a362dbcdc
Author: ChengJie1053 <[email protected]>
AuthorDate: Mon Aug 21 11:55:44 2023 +0800

    jar upload bug fix (#2948)
    
    * jar upload bug fix
---
 .../streampark/console/core/entity/Resource.java   | 25 ++++++++++++++++++++++
 .../core/service/impl/AppBuildPipeServiceImpl.java | 11 +++++++++-
 .../core/service/impl/ApplicationServiceImpl.java  | 22 +++++++++++++++++++
 .../core/service/impl/ResourceServiceImpl.java     | 20 +++++++++++------
 4 files changed, 70 insertions(+), 8 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Resource.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Resource.java
index 22a1fb3e0..e02661f1a 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Resource.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Resource.java
@@ -82,4 +82,29 @@ public class Resource implements Serializable {
   private transient String sortOrder;
 
   private transient String connector;
+
+  public void setResourcePath(String resourcePath) {
+    if (resourcePath == null) {
+      throw new IllegalArgumentException("resource path cannot be null.");
+    }
+    String[] namePath = resourcePath.split(":");
+    if (namePath.length != 2) {
+      throw new IllegalArgumentException("resource path invalid, format: 
$name:$path");
+    }
+    this.resourcePath = resourcePath;
+  }
+
+  public String getFileName() {
+    if (this.resourcePath == null) {
+      return null;
+    }
+    return resourcePath.split(":")[0];
+  }
+
+  public String getFilePath() {
+    if (this.resourcePath == null) {
+      return null;
+    }
+    return resourcePath.split(":")[1];
+  }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index b7c37eee6..fa410a99a 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -80,6 +80,7 @@ import 
org.apache.streampark.flink.packer.pipeline.impl.FlinkRemoteBuildPipeline
 import 
org.apache.streampark.flink.packer.pipeline.impl.FlinkYarnApplicationBuildPipeline;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
@@ -234,13 +235,21 @@ public class AppBuildPipeServiceImpl
               FsOperator fsOperator = app.getFsOperator();
               fsOperator.delete(appHome);
               if (app.isUploadJob()) {
+                String uploadJar = appUploads.concat("/").concat(app.getJar());
                 File localJar =
                     new File(
                         String.format(
                             "%s/%d/%s",
                             Workspace.local().APP_UPLOADS(), app.getTeamId(), 
app.getJar()));
+                if (!localJar.exists()) {
+                  Resource resource =
+                      resourceService.findByResourceName(app.getTeamId(), 
app.getJar());
+                  if (resource != null && 
StringUtils.isNotBlank(resource.getFilePath())) {
+                    localJar = new File(resource.getFilePath());
+                    uploadJar = 
appUploads.concat("/").concat(localJar.getName());
+                  }
+                }
                 // upload jar copy to appHome
-                String uploadJar = appUploads.concat("/").concat(app.getJar());
                 checkOrElseUploadJar(app.getFsOperator(), localJar, uploadJar, 
appUploads);
 
                 switch (app.getApplicationType()) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 65bee297b..3dcf471ce 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -24,6 +24,7 @@ import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.enums.ResolveOrder;
 import org.apache.streampark.common.enums.RestoreMode;
 import org.apache.streampark.common.enums.StorageType;
+import org.apache.streampark.common.fs.FsOperator;
 import org.apache.streampark.common.fs.HdfsOperator;
 import org.apache.streampark.common.fs.LfsOperator;
 import org.apache.streampark.common.util.CompletableFutureUtils;
@@ -49,6 +50,7 @@ import org.apache.streampark.console.core.entity.FlinkCluster;
 import org.apache.streampark.console.core.entity.FlinkEnv;
 import org.apache.streampark.console.core.entity.FlinkSql;
 import org.apache.streampark.console.core.entity.Project;
+import org.apache.streampark.console.core.entity.Resource;
 import org.apache.streampark.console.core.entity.SavePoint;
 import org.apache.streampark.console.core.enums.AppExistsState;
 import org.apache.streampark.console.core.enums.CandidateType;
@@ -74,6 +76,7 @@ import 
org.apache.streampark.console.core.service.FlinkEnvService;
 import org.apache.streampark.console.core.service.FlinkSqlService;
 import org.apache.streampark.console.core.service.LogClientService;
 import org.apache.streampark.console.core.service.ProjectService;
+import org.apache.streampark.console.core.service.ResourceService;
 import org.apache.streampark.console.core.service.SavePointService;
 import org.apache.streampark.console.core.service.SettingService;
 import org.apache.streampark.console.core.service.VariableService;
@@ -217,6 +220,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
   @Autowired private FlinkClusterWatcher flinkClusterWatcher;
 
+  @Autowired private ResourceService resourceService;
+
   @PostConstruct
   public void resetOptionState() {
     this.baseMapper.resetOptionState();
@@ -728,6 +733,13 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       String jarPath =
           String.format(
               "%s/%d/%s", Workspace.local().APP_UPLOADS(), 
appParam.getTeamId(), appParam.getJar());
+      if (!new File(jarPath).exists()) {
+        Resource resource =
+            resourceService.findByResourceName(appParam.getTeamId(), 
appParam.getJar());
+        if (resource != null && 
StringUtils.isNotBlank(resource.getFilePath())) {
+          jarPath = resource.getFilePath();
+        }
+      }
       appParam.setJarCheckSum(FileUtils.checksumCRC32(new File(jarPath)));
     }
 
@@ -1484,6 +1496,16 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             break;
           case APACHE_FLINK:
             flinkUserJar = String.format("%s/%s", application.getAppHome(), 
application.getJar());
+            if (!FsOperator.hdfs().exists(flinkUserJar)) {
+              Resource resource =
+                  resourceService.findByResourceName(application.getTeamId(), 
application.getJar());
+              if (resource != null && 
StringUtils.isNotBlank(resource.getFilePath())) {
+                flinkUserJar =
+                    String.format(
+                        "%s/%s",
+                        application.getAppHome(), new 
File(resource.getFilePath()).getName());
+              }
+            }
             break;
           default:
             throw new IllegalArgumentException(
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
index de3ec905c..8b900afe5 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
@@ -43,6 +43,7 @@ import org.apache.streampark.flink.packer.maven.Artifact;
 import org.apache.streampark.flink.packer.maven.MavenTool;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.table.factories.Factory;
 import org.apache.hadoop.shaded.org.apache.commons.codec.digest.DigestUtils;
@@ -204,13 +205,18 @@ public class ResourceServiceImpl extends 
ServiceImpl<ResourceMapper, Resource>
     Resource findResource = getById(resource.getId());
     checkOrElseAlert(findResource);
 
-    FsOperator.lfs()
-        .delete(
-            String.format(
-                "%s/%d/%s",
-                Workspace.local().APP_UPLOADS(),
-                findResource.getTeamId(),
-                findResource.getResourceName()));
+    String filePath =
+        String.format(
+            "%s/%d/%s",
+            Workspace.local().APP_UPLOADS(),
+            findResource.getTeamId(),
+            findResource.getResourceName());
+
+    if (!new File(filePath).exists() && 
StringUtils.isNotBlank(findResource.getFilePath())) {
+      filePath = findResource.getFilePath();
+    }
+
+    FsOperator.lfs().delete(filePath);
 
     this.removeById(resource);
   }

Reply via email to