This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 07e57fab6 [Bug] yarn session cluster startup failed (#2656)
07e57fab6 is described below
commit 07e57fab61150d989350dec03f2d4f074d09af31
Author: zhoulii <[email protected]>
AuthorDate: Thu Apr 20 10:43:46 2023 +0800
[Bug] yarn session cluster startup failed (#2656)
Co-authored-by: zhoulii <[email protected]>
---
.../flink/client/impl/YarnSessionClient.scala | 18 +++++++-----------
1 file changed, 7 insertions(+), 11 deletions(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index 595e983ea..bbcd0cdaf 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -33,6 +33,8 @@ import org.apache.hadoop.yarn.api.records.{ApplicationId,
FinalApplicationStatus
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
import org.apache.hadoop.yarn.util.ConverterUtils
+import java.util
+
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
@@ -72,21 +74,15 @@ object YarnSessionClient extends YarnClientTrait {
s"Hadoop security with Kerberos is enabled but the login user
$currentUser does not have Kerberos credentials or delegation tokens!")
}
}
- val providedLibs = {
- val array = ListBuffer(
- deployRequest.hdfsWorkspace.flinkLib,
- deployRequest.hdfsWorkspace.flinkPlugins,
- deployRequest.hdfsWorkspace.appJars,
- deployRequest.hdfsWorkspace.appPlugins
- )
- array.toList
- }
+
+ val shipFiles = new util.ArrayList[String]()
+ shipFiles.add(s"${deployRequest.flinkVersion.flinkHome}/lib")
flinkConfig
- // yarn.provided.lib.dirs
- .safeSet(YarnConfigOptions.PROVIDED_LIB_DIRS, providedLibs.asJava)
// flinkDistJar
.safeSet(YarnConfigOptions.FLINK_DIST_JAR,
deployRequest.hdfsWorkspace.flinkDistJar)
+ // flink lib
+ .safeSet(YarnConfigOptions.SHIP_FILES, shipFiles)
// yarnDeployment Target
.safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
// conf dir