This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.1
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.1 by this push:
new 5ec700bdd Load the plugins directory of Flink home when starting the
Yarn sessi… (#2800)
5ec700bdd is described below
commit 5ec700bddb01ddb878000f2f1fbe1431696aec5a
Author: macksonmu <[email protected]>
AuthorDate: Sun Jun 18 13:34:11 2023 +0800
Load the plugins directory of Flink home when starting the Yarn sessi…
(#2800)
---
.../org/apache/streampark/flink/client/impl/YarnSessionClient.scala | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index bbcd0cdaf..0ef54d5c1 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -60,7 +60,7 @@ object YarnSessionClient extends YarnClientTrait {
* @param deployRequest
* @param flinkConfig
*/
- def setConfig(deployRequest: DeployRequest, flinkConfig: Configuration):
Unit = {
+ def deployClusterConfig(deployRequest: DeployRequest, flinkConfig:
Configuration): Unit = {
val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
deployRequest.flinkVersion.flinkHome)
val currentUser = UserGroupInformation.getCurrentUser
@@ -77,6 +77,7 @@ object YarnSessionClient extends YarnClientTrait {
val shipFiles = new util.ArrayList[String]()
shipFiles.add(s"${deployRequest.flinkVersion.flinkHome}/lib")
+ shipFiles.add(s"${deployRequest.flinkVersion.flinkHome}/plugins")
flinkConfig
// flinkDistJar
@@ -203,7 +204,7 @@ object YarnSessionClient extends YarnClientTrait {
try {
val flinkConfig =
extractConfiguration(deployRequest.flinkVersion.flinkHome,
deployRequest.properties)
- setConfig(deployRequest, flinkConfig)
+ deployClusterConfig(deployRequest, flinkConfig)
val yarnClusterDescriptor =
getSessionClusterDeployDescriptor(flinkConfig)
clusterDescriptor = yarnClusterDescriptor._2
if (null != deployRequest.clusterId && deployRequest.clusterId.nonEmpty)
{