This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 4777fea6d Load the plugins directory of Flink home when starting the
Yarn session cluster (#2804)
4777fea6d is described below
commit 4777fea6d904b7564917c04cd969f62270c2857f
Author: macksonmu <[email protected]>
AuthorDate: Sun Jun 18 13:33:21 2023 +0800
Load the plugins directory of Flink home when starting the Yarn session
cluster (#2804)
---
.../org/apache/streampark/flink/client/impl/YarnSessionClient.scala | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index 6ad07198b..fbcb38c4c 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -60,7 +60,7 @@ object YarnSessionClient extends YarnClientTrait {
* @param deployRequest
* @param flinkConfig
*/
- def setConfig(deployRequest: DeployRequest, flinkConfig: Configuration):
Unit = {
+ def deployClusterConfig(deployRequest: DeployRequest, flinkConfig:
Configuration): Unit = {
val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
deployRequest.flinkVersion.flinkHome)
val currentUser = UserGroupInformation.getCurrentUser
@@ -77,6 +77,7 @@ object YarnSessionClient extends YarnClientTrait {
val shipFiles = new util.ArrayList[String]()
shipFiles.add(s"${deployRequest.flinkVersion.flinkHome}/lib")
+ shipFiles.add(s"${deployRequest.flinkVersion.flinkHome}/plugins")
flinkConfig
// flinkDistJar
@@ -203,7 +204,7 @@ object YarnSessionClient extends YarnClientTrait {
try {
val flinkConfig =
extractConfiguration(deployRequest.flinkVersion.flinkHome,
deployRequest.properties)
- setConfig(deployRequest, flinkConfig)
+ deployClusterConfig(deployRequest, flinkConfig)
val yarnClusterDescriptor =
getSessionClusterDeployDescriptor(flinkConfig)
clusterDescriptor = yarnClusterDescriptor._2
if (null != deployRequest.clusterId && deployRequest.clusterId.nonEmpty)
{