This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.1
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.1 by this push:
     new 5ec700bdd Load the plugins directory of Flink home when starting the 
Yarn sessi… (#2800)
5ec700bdd is described below

commit 5ec700bddb01ddb878000f2f1fbe1431696aec5a
Author: macksonmu <[email protected]>
AuthorDate: Sun Jun 18 13:34:11 2023 +0800

    Load the plugins directory of Flink home when starting the Yarn sessi… 
(#2800)
---
 .../org/apache/streampark/flink/client/impl/YarnSessionClient.scala  | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index bbcd0cdaf..0ef54d5c1 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -60,7 +60,7 @@ object YarnSessionClient extends YarnClientTrait {
    * @param deployRequest
    * @param flinkConfig
    */
-  def setConfig(deployRequest: DeployRequest, flinkConfig: Configuration): 
Unit = {
+  def deployClusterConfig(deployRequest: DeployRequest, flinkConfig: 
Configuration): Unit = {
     val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
       deployRequest.flinkVersion.flinkHome)
     val currentUser = UserGroupInformation.getCurrentUser
@@ -77,6 +77,7 @@ object YarnSessionClient extends YarnClientTrait {
 
     val shipFiles = new util.ArrayList[String]()
     shipFiles.add(s"${deployRequest.flinkVersion.flinkHome}/lib")
+    shipFiles.add(s"${deployRequest.flinkVersion.flinkHome}/plugins")
 
     flinkConfig
       // flinkDistJar
@@ -203,7 +204,7 @@ object YarnSessionClient extends YarnClientTrait {
     try {
       val flinkConfig =
         extractConfiguration(deployRequest.flinkVersion.flinkHome, 
deployRequest.properties)
-      setConfig(deployRequest, flinkConfig)
+      deployClusterConfig(deployRequest, flinkConfig)
       val yarnClusterDescriptor = 
getSessionClusterDeployDescriptor(flinkConfig)
       clusterDescriptor = yarnClusterDescriptor._2
       if (null != deployRequest.clusterId && deployRequest.clusterId.nonEmpty) 
{

Reply via email to