This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 07e57fab6  [Bug] yarn session cluster startup failed (#2656)
07e57fab6 is described below

commit 07e57fab61150d989350dec03f2d4f074d09af31
Author: zhoulii <[email protected]>
AuthorDate: Thu Apr 20 10:43:46 2023 +0800

     [Bug] yarn session cluster startup failed (#2656)
    
    Co-authored-by: zhoulii <[email protected]>
---
 .../flink/client/impl/YarnSessionClient.scala          | 18 +++++++-----------
 1 file changed, 7 insertions(+), 11 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index 595e983ea..bbcd0cdaf 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -33,6 +33,8 @@ import org.apache.hadoop.yarn.api.records.{ApplicationId, 
FinalApplicationStatus
 import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
 import org.apache.hadoop.yarn.util.ConverterUtils
 
+import java.util
+
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
@@ -72,21 +74,15 @@ object YarnSessionClient extends YarnClientTrait {
           s"Hadoop security with Kerberos is enabled but the login user 
$currentUser does not have Kerberos credentials or delegation tokens!")
       }
     }
-    val providedLibs = {
-      val array = ListBuffer(
-        deployRequest.hdfsWorkspace.flinkLib,
-        deployRequest.hdfsWorkspace.flinkPlugins,
-        deployRequest.hdfsWorkspace.appJars,
-        deployRequest.hdfsWorkspace.appPlugins
-      )
-      array.toList
-    }
+
+    val shipFiles = new util.ArrayList[String]()
+    shipFiles.add(s"${deployRequest.flinkVersion.flinkHome}/lib")
 
     flinkConfig
-      // yarn.provided.lib.dirs
-      .safeSet(YarnConfigOptions.PROVIDED_LIB_DIRS, providedLibs.asJava)
       // flinkDistJar
       .safeSet(YarnConfigOptions.FLINK_DIST_JAR, 
deployRequest.hdfsWorkspace.flinkDistJar)
+      // flink lib
+      .safeSet(YarnConfigOptions.SHIP_FILES, shipFiles)
       // yarnDeployment Target
       .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
       // conf dir

Reply via email to