This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch yarn-app
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/yarn-app by this push:
new 3cc157edb [Improve] flink yarn-app mode improvement
3cc157edb is described below
commit 3cc157edb2e736e87cc17cc74d28413c17e938d0
Author: benjobs <[email protected]>
AuthorDate: Thu Nov 9 14:01:42 2023 +0800
[Improve] flink yarn-app mode improvement
---
.../core/service/impl/AppBuildPipeServiceImpl.java | 25 +++++++++++-----------
.../flink/client/impl/YarnApplicationClient.scala | 1 -
2 files changed, 13 insertions(+), 13 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 2b4cc9253..965f21e70 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -91,7 +91,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -390,29 +389,31 @@ public class AppBuildPipeServiceImpl
if (app.isCustomCodeJob()) {
// customCode upload jar to appHome...
FsOperator fsOperator = app.getFsOperator();
+
if (app.isUploadJob()) {
// 1). upload jar to local upload.
File localJar = new File(WebUtils.getAppTempDir(), app.getJar());
File uploadJar = new File(localUploadDIR, app.getJar());
checkOrElseUploadJar(FsOperator.lfs(), localJar, uploadJar,
localUploadDIR);
+
if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) {
List<File> jars = new ArrayList<>(0);
+
+ // 1) user jar
jars.add(uploadJar);
- // 2). jar dependency to local upload
- if (!app.getDependencyObject().getJar().isEmpty()) {
- for (String jar : app.getDependencyObject().getJar()) {
- jars.add(new File(localUploadDIR, jar));
- }
- }
+ // 2). jar dependency
+ app.getDependencyObject()
+ .getJar()
+ .forEach(jar -> jars.add(new File(localUploadDIR, jar)));
- // 3. pom dependency to local upload
+ // 3). pom dependency
if (!app.getDependencyInfo().mavenArts().isEmpty()) {
- Set<File> dependJars =
-
MavenTool.resolveArtifactsAsJava(app.getDependencyInfo().mavenArts());
- jars.addAll(dependJars);
+
jars.addAll(MavenTool.resolveArtifactsAsJava(app.getDependencyInfo().mavenArts()));
}
- fsOperator.mkdirs(app.getAppLib());
+
+ fsOperator.mkCleanDirs(app.getAppLib());
+ // upload jars to uploadDIR
jars.forEach(jar -> fsOperator.upload(jar.getAbsolutePath(),
app.getAppLib()));
}
} else {
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index b84aae322..26af8f8ab 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -62,7 +62,6 @@ object YarnApplicationClient extends YarnClientTrait {
val providedLibs = {
val array = ListBuffer(
submitRequest.hdfsWorkspace.flinkLib,
- submitRequest.hdfsWorkspace.flinkPlugins,
submitRequest.hdfsWorkspace.appJars,
submitRequest.hdfsWorkspace.appPlugins
)