This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
new f23d515ba [Bug] deploy flink job on yarnApplication Mode bug fixed
(#3325)
f23d515ba is described below
commit f23d515ba7cd23fbec303a2dd7823d634aaac6bf
Author: benjobs <[email protected]>
AuthorDate: Fri Nov 10 01:21:19 2023 +0800
[Bug] deploy flink job on yarnApplication Mode bug fixed (#3325)
* [Bug] yarn application bug fixed
* [Bug] upload job to workspace bug fixed
* [Improve] default hdfs workspace path improvement
* [Improve] flink yarn-app mode improvement
* [Improve] upload jar minor improvement
* [Improve] release app error info improvement
---------
Co-authored-by: benjobs <[email protected]>
---
.../conf/streampark-console-config/application.yml | 2 +-
.../ApplicationBuildPipelineController.java | 93 ++++++------
.../console/core/entity/Application.java | 5 +
.../core/service/impl/AppBuildPipeServiceImpl.java | 160 +++++++++++++--------
.../core/service/impl/ApplicationServiceImpl.java | 19 ++-
.../src/main/resources/application.yml | 2 +-
.../flink/client/impl/YarnApplicationClient.scala | 15 +-
.../streampark/flink/packer/maven/MavenTool.scala | 5 +
.../flink/packer/pipeline/BuildRequest.scala | 1 -
.../impl/FlinkYarnApplicationBuildPipeline.scala | 46 +++---
10 files changed, 183 insertions(+), 165 deletions(-)
diff --git
a/deploy/helm/streampark/conf/streampark-console-config/application.yml
b/deploy/helm/streampark/conf/streampark-console-config/application.yml
index 309e7e6af..2cee6fa8a 100755
--- a/deploy/helm/streampark/conf/streampark-console-config/application.yml
+++ b/deploy/helm/streampark/conf/streampark-console-config/application.yml
@@ -97,7 +97,7 @@ streampark:
# local workspace, used to store source code and build dir etc.
workspace:
local: /opt/streampark_workspace
- remote: hdfs://hdfscluster/streampark # support hdfs:///streampark/ 、
/streampark 、hdfs://host:ip/streampark/
+ remote: hdfs:///streampark # support hdfs:///streampark/ 、 /streampark
、hdfs://host:ip/streampark/
# remote docker register namespace for streampark
docker:
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
index 52aecfcb5..65352306f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
@@ -95,56 +95,51 @@ public class ApplicationBuildPipelineController {
@PermissionAction(id = "#appId", type = PermissionType.APP)
@PostMapping(value = "build")
@RequiresPermissions("app:create")
- public RestResponse buildApplication(Long appId, boolean forceBuild) {
- try {
- Application app = applicationService.getById(appId);
-
- // 1) check flink version
- FlinkEnv env = flinkEnvService.getById(app.getVersionId());
- boolean checkVersion = env.getFlinkVersion().checkVersion(false);
- if (!checkVersion) {
- throw new ApiAlertException(
- "Unsupported flink version: " + env.getFlinkVersion().version());
- }
-
- // 2) check env
- boolean envOk = applicationService.checkEnv(app);
- if (!envOk) {
- throw new ApiAlertException(
- "Check flink env failed, please check the flink version of this
job");
- }
-
- if (!forceBuild && !appBuildPipeService.allowToBuildNow(appId)) {
- throw new ApiAlertException(
- "The job is invalid, or the job cannot be built while it is
running");
- }
- // check if you need to go through the build process (if the jar and pom
have changed,
- // you need to go through the build process, if other common parameters
are modified,
- // you don't need to go through the build process)
-
- ApplicationLog applicationLog = new ApplicationLog();
- applicationLog.setOptionName(
-
org.apache.streampark.console.core.enums.Operation.RELEASE.getValue());
- applicationLog.setAppId(app.getId());
- applicationLog.setOptionTime(new Date());
-
- boolean needBuild = applicationService.checkBuildAndUpdate(app);
- if (!needBuild) {
- applicationLog.setSuccess(true);
- applicationLogService.save(applicationLog);
- return RestResponse.success(true);
- }
-
- // rollback
- if (app.isNeedRollback() && app.isFlinkSqlJob()) {
- flinkSqlService.rollback(app);
- }
-
- boolean actionResult = appBuildPipeService.buildApplication(app,
applicationLog);
- return RestResponse.success(actionResult);
- } catch (Exception e) {
- return RestResponse.success(false).message(e.getMessage());
+ public RestResponse buildApplication(Long appId, boolean forceBuild) throws
Exception {
+ Application app = applicationService.getById(appId);
+
+ // 1) check flink version
+ FlinkEnv env = flinkEnvService.getById(app.getVersionId());
+ boolean checkVersion = env.getFlinkVersion().checkVersion(false);
+ if (!checkVersion) {
+ throw new ApiAlertException("Unsupported flink version: " +
env.getFlinkVersion().version());
}
+
+ // 2) check env
+ boolean envOk = applicationService.checkEnv(app);
+ if (!envOk) {
+ throw new ApiAlertException(
+ "Check flink env failed, please check the flink version of this
job");
+ }
+
+ if (!forceBuild && !appBuildPipeService.allowToBuildNow(appId)) {
+ throw new ApiAlertException(
+ "The job is invalid, or the job cannot be built while it is
running");
+ }
+ // check if you need to go through the build process (if the jar and pom
have changed,
+ // you need to go through the build process, if other common parameters
are modified,
+ // you don't need to go through the build process)
+
+ ApplicationLog applicationLog = new ApplicationLog();
+ applicationLog.setOptionName(
+ org.apache.streampark.console.core.enums.Operation.RELEASE.getValue());
+ applicationLog.setAppId(app.getId());
+ applicationLog.setOptionTime(new Date());
+
+ boolean needBuild = applicationService.checkBuildAndUpdate(app);
+ if (!needBuild) {
+ applicationLog.setSuccess(true);
+ applicationLogService.save(applicationLog);
+ return RestResponse.success(true);
+ }
+
+ // rollback
+ if (app.isNeedRollback() && app.isFlinkSqlJob()) {
+ flinkSqlService.rollback(app);
+ }
+
+ boolean actionResult = appBuildPipeService.buildApplication(app,
applicationLog);
+ return RestResponse.success(actionResult);
}
/**
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 568f7f2a5..5b3aa4db2 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -439,6 +439,11 @@ public class Application implements Serializable {
return getAppHome().concat("/lib");
}
+ @JsonIgnore
+ public String getLocalAppLib() {
+ return getLocalAppHome().concat("/lib");
+ }
+
@JsonIgnore
public ApplicationType getApplicationType() {
return ApplicationType.of(appType);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index f6fb7e641..07c66ff05 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -19,8 +19,6 @@ package org.apache.streampark.console.core.service.impl;
import org.apache.streampark.common.conf.ConfigConst;
import org.apache.streampark.common.conf.Workspace;
-import org.apache.streampark.common.enums.ApplicationType;
-import org.apache.streampark.common.enums.DevelopmentMode;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.common.util.FileUtils;
@@ -52,6 +50,7 @@ import
org.apache.streampark.console.core.service.MessageService;
import org.apache.streampark.console.core.service.SettingService;
import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
import org.apache.streampark.flink.packer.docker.DockerConf;
+import org.apache.streampark.flink.packer.maven.MavenTool;
import org.apache.streampark.flink.packer.pipeline.BuildPipeline;
import org.apache.streampark.flink.packer.pipeline.BuildResult;
import org.apache.streampark.flink.packer.pipeline.DockerBuildSnapshot;
@@ -87,6 +86,7 @@ import
org.springframework.transaction.annotation.Transactional;
import javax.annotation.Nonnull;
import java.io.File;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -180,44 +180,7 @@ public class AppBuildPipeServiceImpl
applicationService.checkEnv(app);
// 2) some preparatory work
- String appUploads = app.getWorkspace().APP_UPLOADS();
-
- if (app.isCustomCodeJob()) {
- // customCode upload jar to appHome...
- FsOperator fsOperator = app.getFsOperator();
- if (app.isCICDJob()) {
- String appHome = app.getAppHome();
- fsOperator.mkCleanDirs(appHome);
- fsOperator.upload(app.getDistHome(), appHome);
- } else {
- File localJar = new File(WebUtils.getAppTempDir(),
app.getJar());
- // upload jar copy to appHome
- String uploadJar = appUploads.concat("/").concat(app.getJar());
- checkOrElseUploadJar(app.getFsOperator(), localJar, uploadJar,
appUploads);
- if (app.getApplicationType() ==
ApplicationType.STREAMPARK_FLINK) {
- fsOperator.mkdirs(app.getAppLib());
- fsOperator.copy(uploadJar, app.getAppLib(), false, true);
- }
- }
- }
-
- if (app.isFlinkSqlJob() || app.isUploadJob()) {
- if (!app.getDependencyObject().getJar().isEmpty()) {
- String localUploads = Workspace.local().APP_UPLOADS();
- // copy jar to local upload dir
- for (String jar : app.getDependencyObject().getJar()) {
- File localJar = new File(WebUtils.getAppTempDir(), jar);
- File uploadJar = new File(localUploads, jar);
- if (!localJar.exists() && !uploadJar.exists()) {
- throw new ApiAlertException("Missing file: " + jar + ",
please upload again");
- }
- if (localJar.exists()) {
- checkOrElseUploadJar(
- FsOperator.lfs(), localJar,
uploadJar.getAbsolutePath(), localUploads);
- }
- }
- }
- }
+ prepareJars(app);
}
@Override
@@ -325,24 +288,17 @@ public class AppBuildPipeServiceImpl
/** create building pipeline instance */
private BuildPipeline createPipelineInstance(@Nonnull Application app) {
FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(app.getVersionId());
- String flinkUserJar = retrieveFlinkUserJar(flinkEnv, app);
+ String userLocalJar = retrieveUserLocalJar(flinkEnv, app);
ExecutionMode executionMode = app.getExecutionModeEnum();
String mainClass =
app.isCustomCodeJob() ? app.getMainClass() :
ConfigConst.STREAMPARK_FLINKSQL_CLIENT_CLASS();
switch (executionMode) {
case YARN_APPLICATION:
String yarnProvidedPath = app.getAppLib();
- String localWorkspace = app.getLocalAppHome().concat("/lib");
- if (app.getDevelopmentMode().equals(DevelopmentMode.CUSTOM_CODE)
- && app.getApplicationType().equals(ApplicationType.APACHE_FLINK)) {
- yarnProvidedPath = app.getAppHome();
- localWorkspace = app.getLocalAppHome();
- }
FlinkYarnApplicationBuildRequest yarnAppRequest =
new FlinkYarnApplicationBuildRequest(
app.getJobName(),
mainClass,
- localWorkspace,
yarnProvidedPath,
app.getDevelopmentMode(),
app.getDependencyInfo());
@@ -356,7 +312,7 @@ public class AppBuildPipeServiceImpl
app.getJobName(),
app.getLocalAppHome(),
mainClass,
- flinkUserJar,
+ userLocalJar,
app.getExecutionModeEnum(),
app.getDevelopmentMode(),
flinkEnv.getFlinkVersion(),
@@ -369,7 +325,7 @@ public class AppBuildPipeServiceImpl
app.getJobName(),
app.getLocalAppHome(),
mainClass,
- flinkUserJar,
+ userLocalJar,
app.getExecutionModeEnum(),
app.getDevelopmentMode(),
flinkEnv.getFlinkVersion(),
@@ -384,7 +340,7 @@ public class AppBuildPipeServiceImpl
app.getJobName(),
app.getLocalAppHome(),
mainClass,
- flinkUserJar,
+ userLocalJar,
app.getExecutionModeEnum(),
app.getDevelopmentMode(),
flinkEnv.getFlinkVersion(),
@@ -408,15 +364,97 @@ public class AppBuildPipeServiceImpl
}
}
+ private void prepareJars(Application app) {
+ File localUploadDIR = new File(Workspace.local().APP_UPLOADS());
+ if (!localUploadDIR.exists()) {
+ localUploadDIR.mkdirs();
+ }
+
+ FsOperator localFS = FsOperator.lfs();
+ // 1. copy jar to local upload dir
+ if (app.isFlinkSqlJob() || app.isUploadJob()) {
+ if (!app.getDependencyObject().getJar().isEmpty()) {
+ for (String jar : app.getDependencyObject().getJar()) {
+ File localJar = new File(WebUtils.getAppTempDir(), jar);
+ File localUploadJar = new File(localUploadDIR, jar);
+ if (!localJar.exists() && !localUploadJar.exists()) {
+ throw new ApiAlertException("Missing file: " + jar + ", please
upload again");
+ }
+ if (localJar.exists()) {
+ checkOrElseUploadJar(localFS, localJar, localUploadJar,
localUploadDIR);
+ }
+ }
+ }
+ }
+
+ if (app.isCustomCodeJob()) {
+ // customCode upload jar to appHome...
+ FsOperator fsOperator = app.getFsOperator();
+
+ if (app.isUploadJob()) {
+ // 1). upload jar to local uploadDIR.
+ File localJar = new File(WebUtils.getAppTempDir(), app.getJar());
+ File localUploadJar = new File(localUploadDIR, app.getJar());
+ checkOrElseUploadJar(localFS, localJar, localUploadJar,
localUploadDIR);
+
+ // 2) copy jar to local $app_home/lib
+ boolean cleanUpload = false;
+ File libJar = new File(app.getLocalAppLib(), app.getJar());
+ if (!localFS.exists(app.getLocalAppLib())) {
+ cleanUpload = true;
+ } else {
+ if (libJar.exists()) {
+ if (!FileUtils.equals(localJar, libJar)) {
+ cleanUpload = true;
+ }
+ } else {
+ cleanUpload = true;
+ }
+ }
+
+ if (cleanUpload) {
+ localFS.mkCleanDirs(app.getLocalAppLib());
+ localFS.upload(localUploadJar.getAbsolutePath(),
app.getLocalAppLib());
+ }
+
+ // 3) for YARNApplication mode
+ if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) {
+ List<File> jars = new ArrayList<>(0);
+
+ // 1) user jar
+ jars.add(libJar);
+
+ // 2). jar dependency
+ app.getDependencyObject()
+ .getJar()
+ .forEach(jar -> jars.add(new File(localUploadDIR, jar)));
+
+ // 3). pom dependency
+ if (!app.getDependencyInfo().mavenArts().isEmpty()) {
+
jars.addAll(MavenTool.resolveArtifactsAsJava(app.getDependencyInfo().mavenArts()));
+ }
+
+ fsOperator.mkCleanDirs(app.getAppLib());
+ // 4). upload jars to appLibDIR
+ jars.forEach(jar -> fsOperator.upload(jar.getAbsolutePath(),
app.getAppLib()));
+ }
+ } else {
+ String appHome = app.getAppHome();
+ fsOperator.mkCleanDirs(appHome);
+ fsOperator.upload(app.getDistHome(), appHome);
+ }
+ }
+ }
+
/** copy from {@link ApplicationServiceImpl#start(Application, boolean)} */
- private String retrieveFlinkUserJar(FlinkEnv flinkEnv, Application app) {
+ private String retrieveUserLocalJar(FlinkEnv flinkEnv, Application app) {
switch (app.getDevelopmentMode()) {
case CUSTOM_CODE:
switch (app.getApplicationType()) {
case STREAMPARK_FLINK:
- return String.format("%s/%s", app.getAppLib(),
app.getModule().concat(".jar"));
+ return String.format("%s/%s", app.getLocalAppLib(),
app.getModule().concat(".jar"));
case APACHE_FLINK:
- return String.format("%s/%s", WebUtils.getAppTempDir(),
app.getJar());
+ return String.format("%s/%s", app.getLocalAppLib(), app.getJar());
default:
throw new IllegalArgumentException(
"[StreamPark] unsupported ApplicationType of custom code: "
@@ -424,10 +462,6 @@ public class AppBuildPipeServiceImpl
}
case FLINK_SQL:
String sqlDistJar = commonService.getSqlClientJar(flinkEnv);
- if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) {
- String clientPath = Workspace.remote().APP_CLIENT();
- return String.format("%s/%s", clientPath, sqlDistJar);
- }
return Workspace.local().APP_CLIENT().concat("/").concat(sqlDistJar);
default:
throw new UnsupportedOperationException(
@@ -487,13 +521,13 @@ public class AppBuildPipeServiceImpl
}
private void checkOrElseUploadJar(
- FsOperator fsOperator, File localJar, String targetJar, String
targetDir) {
- if (!fsOperator.exists(targetJar)) {
- fsOperator.upload(localJar.getAbsolutePath(), targetDir, false, true);
+ FsOperator fsOperator, File localJar, File targetJar, File targetDir) {
+ if (!fsOperator.exists(targetJar.getAbsolutePath())) {
+ fsOperator.upload(localJar.getAbsolutePath(),
targetDir.getAbsolutePath());
} else {
// The file exists to check whether it is consistent, and if it is
inconsistent, re-upload it
- if (!FileUtils.equals(localJar, new File(targetJar))) {
- fsOperator.upload(localJar.getAbsolutePath(), targetDir, false, true);
+ if (!FileUtils.equals(localJar, targetJar)) {
+ fsOperator.upload(localJar.getAbsolutePath(),
targetDir.getAbsolutePath());
}
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index e23c12564..e894bd626 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -704,16 +704,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
ApiAlertException.throwIfNull(
appParam.getTeamId(), "The teamId can't be null. Create application
failed.");
- if (appParam.isFlinkSqlJob()) {
- appParam.setBuild(true);
- } else {
- if (appParam.isUploadJob()) {
- appParam.setBuild(!appParam.getDependencyObject().isEmpty());
- } else {
- appParam.setBuild(false);
- }
- }
-
+ appParam.setBuild(true);
appParam.setUserId(commonService.getUserId());
appParam.setState(FlinkAppState.ADDED.getValue());
appParam.setRelease(ReleaseState.NEED_RELEASE.get());
@@ -819,6 +810,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
newApp.setJarCheckSum(oldApp.getJarCheckSum());
newApp.setTags(oldApp.getTags());
newApp.setTeamId(oldApp.getTeamId());
+ newApp.setDependency(oldApp.getDependency());
boolean saved = save(newApp);
if (saved) {
@@ -1499,7 +1491,12 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
"%s/%s", application.getAppLib(),
application.getModule().concat(".jar"));
break;
case APACHE_FLINK:
- flinkUserJar = String.format("%s/%s", application.getAppHome(),
application.getJar());
+ if (application.getFsOperator().exists(application.getAppLib())) {
+ flinkUserJar = String.format("%s/%s", application.getAppLib(),
application.getJar());
+ } else {
+ // compatible with historical version
+ flinkUserJar = String.format("%s/%s", application.getAppHome(),
application.getJar());
+ }
break;
default:
throw new IllegalArgumentException(
diff --git
a/streampark-console/streampark-console-service/src/main/resources/application.yml
b/streampark-console/streampark-console-service/src/main/resources/application.yml
index db59bf85e..52893833f 100644
---
a/streampark-console/streampark-console-service/src/main/resources/application.yml
+++
b/streampark-console/streampark-console-service/src/main/resources/application.yml
@@ -97,7 +97,7 @@ streampark:
# local workspace, used to store source code and build dir etc.
workspace:
local: /opt/streampark_workspace
- remote: hdfs://hdfscluster/streampark # support hdfs:///streampark/ 、
/streampark 、hdfs://host:ip/streampark/
+ remote: hdfs:///streampark # support hdfs:///streampark/ 、 /streampark
、hdfs://host:ip/streampark/
# remote docker register namespace for streampark
docker:
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index 2b981b305..26af8f8ab 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -62,18 +62,15 @@ object YarnApplicationClient extends YarnClientTrait {
val providedLibs = {
val array = ListBuffer(
submitRequest.hdfsWorkspace.flinkLib,
- submitRequest.hdfsWorkspace.flinkPlugins,
submitRequest.hdfsWorkspace.appJars,
submitRequest.hdfsWorkspace.appPlugins
)
- submitRequest.developmentMode match {
- case DevelopmentMode.FLINK_SQL =>
- array +=
s"${workspace.APP_SHIMS}/flink-${submitRequest.flinkVersion.majorVersion}"
- val jobLib = s"${workspace.APP_WORKSPACE}/${submitRequest.id}/lib"
- if (HdfsUtils.exists(jobLib)) {
- array += jobLib
- }
- case _ =>
+ val jobLib = s"${workspace.APP_WORKSPACE}/${submitRequest.id}/lib"
+ if (HdfsUtils.exists(jobLib)) {
+ array += jobLib
+ }
+ if (submitRequest.developmentMode == DevelopmentMode.FLINK_SQL) {
+ array +=
s"${workspace.APP_SHIMS}/flink-${submitRequest.flinkVersion.majorVersion}"
}
array.toList
}
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
index 5dab91892..05562a870 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
@@ -43,8 +43,10 @@ import javax.annotation.{Nonnull, Nullable}
import java.io.File
import java.util
+import java.util.{HashSet, Set => JavaSet}
import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try}
@@ -176,6 +178,9 @@ object MavenTool extends Logger {
buildFatJar(mainClass, jarLibs ++ artFilePaths, outFatJarPath)
}
+ def resolveArtifactsAsJava(mavenArtifacts: Set[Artifact]): JavaSet[File] =
resolveArtifacts(
+ mavenArtifacts).asJava
+
/**
* Resolve the collectoin of artifacts, Artifacts will be download to
ConfigConst.MAVEN_LOCAL_DIR
* if necessary. notes: Only compile scope dependencies will be resolved.
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
index 62f90935a..5abc55b92 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
@@ -116,7 +116,6 @@ case class FlinkRemotePerJobBuildRequest(
case class FlinkYarnApplicationBuildRequest(
appName: String,
mainClass: String,
- localWorkspace: String,
yarnProvidedPath: String,
developmentMode: DevelopmentMode,
dependencyInfo: DependencyInfo)
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
index 92c1d9199..dcf6ad86f 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
@@ -19,7 +19,7 @@ package org.apache.streampark.flink.packer.pipeline.impl
import org.apache.streampark.common.conf.Workspace
import org.apache.streampark.common.enums.DevelopmentMode
-import org.apache.streampark.common.fs.{FsOperator, HdfsOperator, LfsOperator}
+import org.apache.streampark.common.fs.{FsOperator, HdfsOperator}
import org.apache.streampark.common.util.Utils
import org.apache.streampark.flink.packer.maven.MavenTool
import org.apache.streampark.flink.packer.pipeline._
@@ -45,9 +45,7 @@ class FlinkYarnApplicationBuildPipeline(request:
FlinkYarnApplicationBuildReques
override protected def buildProcess(): SimpleBuildResponse = {
execStep(1) {
request.developmentMode match {
- case DevelopmentMode.FLINK_SQL =>
- LfsOperator.mkCleanDirs(request.localWorkspace)
- HdfsOperator.mkCleanDirs(request.yarnProvidedPath)
+ case DevelopmentMode.FLINK_SQL =>
HdfsOperator.mkCleanDirs(request.yarnProvidedPath)
case _ =>
}
logInfo(s"recreate building workspace: ${request.yarnProvidedPath}")
@@ -64,11 +62,7 @@ class FlinkYarnApplicationBuildPipeline(request:
FlinkYarnApplicationBuildReques
}.getOrElse(throw getError.exception)
execStep(3) {
- mavenJars.foreach(
- jar => {
- uploadToHdfs(FsOperator.lfs, jar, request.localWorkspace)
- uploadToHdfs(FsOperator.hdfs, jar, request.yarnProvidedPath)
- })
+ mavenJars.foreach(jar => uploadToHdfs(FsOperator.hdfs, jar,
request.yarnProvidedPath))
}.getOrElse(throw getError.exception)
SimpleBuildResponse()
@@ -82,29 +76,21 @@ class FlinkYarnApplicationBuildPipeline(request:
FlinkYarnApplicationBuildReques
}
if (originFile.isFile) {
// check file in upload dir
- fsOperator match {
- case FsOperator.lfs =>
- fsOperator.copy(originFile.getAbsolutePath, target)
- case FsOperator.hdfs =>
- val uploadFile =
s"${Workspace.remote.APP_UPLOADS}/${originFile.getName}"
- if (fsOperator.exists(uploadFile)) {
- Utils.using(new FileInputStream(originFile))(
- inputStream => {
- if (DigestUtils.md5Hex(inputStream) !=
fsOperator.fileMd5(uploadFile)) {
- fsOperator.upload(originFile.getAbsolutePath, uploadFile)
- }
- })
- } else {
- fsOperator.upload(originFile.getAbsolutePath, uploadFile)
- }
- // copy jar from upload dir to target dir
- fsOperator.copy(uploadFile, target)
+ val uploadFile = s"${Workspace.remote.APP_UPLOADS}/${originFile.getName}"
+ if (fsOperator.exists(uploadFile)) {
+ Utils.using(new FileInputStream(originFile))(
+ inputStream => {
+ if (DigestUtils.md5Hex(inputStream) !=
fsOperator.fileMd5(uploadFile)) {
+ fsOperator.upload(originFile.getAbsolutePath, uploadFile)
+ }
+ })
+ } else {
+ fsOperator.upload(originFile.getAbsolutePath, uploadFile)
}
+ // copy jar from upload dir to target dir
+ fsOperator.copy(uploadFile, target)
} else {
- fsOperator match {
- case FsOperator.hdfs => fsOperator.upload(originFile.getAbsolutePath,
target)
- case _ =>
- }
+ fsOperator.upload(originFile.getAbsolutePath, target)
}
}