This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4777fea6d Load the plugins directory of Flink home when starting the 
Yarn session cluster (#2804)
4777fea6d is described below

commit 4777fea6d904b7564917c04cd969f62270c2857f
Author: macksonmu <[email protected]>
AuthorDate: Sun Jun 18 13:33:21 2023 +0800

    Load the plugins directory of Flink home when starting the Yarn session 
cluster (#2804)
---
 .../org/apache/streampark/flink/client/impl/YarnSessionClient.scala  | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index 6ad07198b..fbcb38c4c 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -60,7 +60,7 @@ object YarnSessionClient extends YarnClientTrait {
    * @param deployRequest
    * @param flinkConfig
    */
-  def setConfig(deployRequest: DeployRequest, flinkConfig: Configuration): 
Unit = {
+  def deployClusterConfig(deployRequest: DeployRequest, flinkConfig: 
Configuration): Unit = {
     val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
       deployRequest.flinkVersion.flinkHome)
     val currentUser = UserGroupInformation.getCurrentUser
@@ -77,6 +77,7 @@ object YarnSessionClient extends YarnClientTrait {
 
     val shipFiles = new util.ArrayList[String]()
     shipFiles.add(s"${deployRequest.flinkVersion.flinkHome}/lib")
+    shipFiles.add(s"${deployRequest.flinkVersion.flinkHome}/plugins")
 
     flinkConfig
       // flinkDistJar
@@ -203,7 +204,7 @@ object YarnSessionClient extends YarnClientTrait {
     try {
       val flinkConfig =
         extractConfiguration(deployRequest.flinkVersion.flinkHome, 
deployRequest.properties)
-      setConfig(deployRequest, flinkConfig)
+      deployClusterConfig(deployRequest, flinkConfig)
       val yarnClusterDescriptor = 
getSessionClusterDeployDescriptor(flinkConfig)
       clusterDescriptor = yarnClusterDescriptor._2
       if (null != deployRequest.clusterId && deployRequest.clusterId.nonEmpty) 
{

Reply via email to