This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new d1a31558e jar upload bug fix (#2948)
d1a31558e is described below
commit d1a31558eafede6baf20de18ebcf6f1a362dbcdc
Author: ChengJie1053 <[email protected]>
AuthorDate: Mon Aug 21 11:55:44 2023 +0800
jar upload bug fix (#2948)
* jar upload bug fix
---
.../streampark/console/core/entity/Resource.java | 25 ++++++++++++++++++++++
.../core/service/impl/AppBuildPipeServiceImpl.java | 11 +++++++++-
.../core/service/impl/ApplicationServiceImpl.java | 22 +++++++++++++++++++
.../core/service/impl/ResourceServiceImpl.java | 20 +++++++++++------
4 files changed, 70 insertions(+), 8 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Resource.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Resource.java
index 22a1fb3e0..e02661f1a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Resource.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Resource.java
@@ -82,4 +82,29 @@ public class Resource implements Serializable {
private transient String sortOrder;
private transient String connector;
+
+ public void setResourcePath(String resourcePath) {
+ if (resourcePath == null) {
+ throw new IllegalArgumentException("resource path cannot be null.");
+ }
+ String[] namePath = resourcePath.split(":");
+ if (namePath.length != 2) {
+ throw new IllegalArgumentException("resource path invalid, format:
$name:$path");
+ }
+ this.resourcePath = resourcePath;
+ }
+
+ public String getFileName() {
+ if (this.resourcePath == null) {
+ return null;
+ }
+ return resourcePath.split(":")[0];
+ }
+
+ public String getFilePath() {
+ if (this.resourcePath == null) {
+ return null;
+ }
+ return resourcePath.split(":")[1];
+ }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index b7c37eee6..fa410a99a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -80,6 +80,7 @@ import
org.apache.streampark.flink.packer.pipeline.impl.FlinkRemoteBuildPipeline
import
org.apache.streampark.flink.packer.pipeline.impl.FlinkYarnApplicationBuildPipeline;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
@@ -234,13 +235,21 @@ public class AppBuildPipeServiceImpl
FsOperator fsOperator = app.getFsOperator();
fsOperator.delete(appHome);
if (app.isUploadJob()) {
+ String uploadJar = appUploads.concat("/").concat(app.getJar());
File localJar =
new File(
String.format(
"%s/%d/%s",
Workspace.local().APP_UPLOADS(), app.getTeamId(),
app.getJar()));
+ if (!localJar.exists()) {
+ Resource resource =
+ resourceService.findByResourceName(app.getTeamId(),
app.getJar());
+ if (resource != null &&
StringUtils.isNotBlank(resource.getFilePath())) {
+ localJar = new File(resource.getFilePath());
+ uploadJar =
appUploads.concat("/").concat(localJar.getName());
+ }
+ }
// upload jar copy to appHome
- String uploadJar = appUploads.concat("/").concat(app.getJar());
checkOrElseUploadJar(app.getFsOperator(), localJar, uploadJar,
appUploads);
switch (app.getApplicationType()) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 65bee297b..3dcf471ce 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -24,6 +24,7 @@ import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.enums.ResolveOrder;
import org.apache.streampark.common.enums.RestoreMode;
import org.apache.streampark.common.enums.StorageType;
+import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.common.fs.HdfsOperator;
import org.apache.streampark.common.fs.LfsOperator;
import org.apache.streampark.common.util.CompletableFutureUtils;
@@ -49,6 +50,7 @@ import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.entity.FlinkEnv;
import org.apache.streampark.console.core.entity.FlinkSql;
import org.apache.streampark.console.core.entity.Project;
+import org.apache.streampark.console.core.entity.Resource;
import org.apache.streampark.console.core.entity.SavePoint;
import org.apache.streampark.console.core.enums.AppExistsState;
import org.apache.streampark.console.core.enums.CandidateType;
@@ -74,6 +76,7 @@ import
org.apache.streampark.console.core.service.FlinkEnvService;
import org.apache.streampark.console.core.service.FlinkSqlService;
import org.apache.streampark.console.core.service.LogClientService;
import org.apache.streampark.console.core.service.ProjectService;
+import org.apache.streampark.console.core.service.ResourceService;
import org.apache.streampark.console.core.service.SavePointService;
import org.apache.streampark.console.core.service.SettingService;
import org.apache.streampark.console.core.service.VariableService;
@@ -217,6 +220,8 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
@Autowired private FlinkClusterWatcher flinkClusterWatcher;
+ @Autowired private ResourceService resourceService;
+
@PostConstruct
public void resetOptionState() {
this.baseMapper.resetOptionState();
@@ -728,6 +733,13 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
String jarPath =
String.format(
"%s/%d/%s", Workspace.local().APP_UPLOADS(),
appParam.getTeamId(), appParam.getJar());
+ if (!new File(jarPath).exists()) {
+ Resource resource =
+ resourceService.findByResourceName(appParam.getTeamId(),
appParam.getJar());
+ if (resource != null &&
StringUtils.isNotBlank(resource.getFilePath())) {
+ jarPath = resource.getFilePath();
+ }
+ }
appParam.setJarCheckSum(FileUtils.checksumCRC32(new File(jarPath)));
}
@@ -1484,6 +1496,16 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
break;
case APACHE_FLINK:
flinkUserJar = String.format("%s/%s", application.getAppHome(),
application.getJar());
+ if (!FsOperator.hdfs().exists(flinkUserJar)) {
+ Resource resource =
+ resourceService.findByResourceName(application.getTeamId(),
application.getJar());
+ if (resource != null &&
StringUtils.isNotBlank(resource.getFilePath())) {
+ flinkUserJar =
+ String.format(
+ "%s/%s",
+ application.getAppHome(), new
File(resource.getFilePath()).getName());
+ }
+ }
break;
default:
throw new IllegalArgumentException(
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
index de3ec905c..8b900afe5 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
@@ -43,6 +43,7 @@ import org.apache.streampark.flink.packer.maven.Artifact;
import org.apache.streampark.flink.packer.maven.MavenTool;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.factories.Factory;
import org.apache.hadoop.shaded.org.apache.commons.codec.digest.DigestUtils;
@@ -204,13 +205,18 @@ public class ResourceServiceImpl extends
ServiceImpl<ResourceMapper, Resource>
Resource findResource = getById(resource.getId());
checkOrElseAlert(findResource);
- FsOperator.lfs()
- .delete(
- String.format(
- "%s/%d/%s",
- Workspace.local().APP_UPLOADS(),
- findResource.getTeamId(),
- findResource.getResourceName()));
+ String filePath =
+ String.format(
+ "%s/%d/%s",
+ Workspace.local().APP_UPLOADS(),
+ findResource.getTeamId(),
+ findResource.getResourceName());
+
+ if (!new File(filePath).exists() &&
StringUtils.isNotBlank(findResource.getFilePath())) {
+ filePath = findResource.getFilePath();
+ }
+
+ FsOperator.lfs().delete(filePath);
this.removeById(resource);
}