This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch pyflink
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit f91b6eb63d3909ceea2c5e09fbf3b38674bae79e
Author: benjobs <[email protected]>
AuthorDate: Mon Sep 4 20:26:18 2023 +0800

    [Improve] pyflink minor improvement
---
 .../flink/client/impl/YarnApplicationClient.scala  |  7 -------
 .../flink/client/trait/FlinkClientTrait.scala      | 24 ++++++++++++++--------
 2 files changed, 15 insertions(+), 16 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index 8444aa2b7..374f44132 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -115,13 +115,6 @@ object YarnApplicationClient extends YarnClientTrait {
         .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, 
ConfigConst.PYTHON_EXECUTABLE)
         // python.executable
         .safeSet(PythonOptions.PYTHON_EXECUTABLE, 
ConfigConst.PYTHON_EXECUTABLE)
-
-      val args: util.List[String] = 
flinkConfig.get(ApplicationConfiguration.APPLICATION_ARGS)
-      // Caused by: java.lang.UnsupportedOperationException
-      val argsList: util.ArrayList[String] = new util.ArrayList[String](args)
-      argsList.add("-pym")
-      
argsList.add(submitRequest.userJarFile.getName.dropRight(ConfigConst.PYTHON_SUFFIX.length))
-      flinkConfig.safeSet(ApplicationConfiguration.APPLICATION_ARGS, argsList)
     }
 
     logInfo(s"""
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 59aa46cc3..6399aff47 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -249,7 +249,12 @@ trait FlinkClientTrait extends Logger {
         .safeSet(PythonOptions.PYTHON_EXECUTABLE, 
ConfigConst.PYTHON_EXECUTABLE)
     }
 
-    val packageProgram = PackagedProgram.newBuilder
+    val programBuilder = PackagedProgram.newBuilder
+    // jar File
+    if (submitRequest.developmentMode != DevelopmentMode.PYFLINK) {
+      programBuilder.setJarFile(submitRequest.userJarFile)
+    }
+    programBuilder
       .setArguments(
         flinkConfig
           .getOptional(ApplicationConfiguration.APPLICATION_ARGS)
@@ -258,8 +263,8 @@ trait FlinkClientTrait extends Logger {
         
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()
       )
       .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)
-      .build()
 
+    val packageProgram = programBuilder.build()
     val jobGraph = PackagedProgramUtils.createJobGraph(
       packageProgram,
       flinkConfig,
@@ -506,13 +511,14 @@ trait FlinkClientTrait extends Logger {
       }
     }
 
-    if (
-      submitRequest.developmentMode == DevelopmentMode.PYFLINK && 
!submitRequest.executionMode
-        .equals(ExecutionMode.YARN_APPLICATION)
-    ) {
-      // python file
-      programArgs.add("-py")
-      programArgs.add(submitRequest.userJarFile.getAbsolutePath)
+    if (submitRequest.developmentMode == DevelopmentMode.PYFLINK) {
+      if (submitRequest.executionMode != ExecutionMode.YARN_APPLICATION) {
+        // python file
+        programArgs.add("-py")
+        programArgs.add(submitRequest.userJarFile.getAbsolutePath)
+      }
+      programArgs.add("-pym")
+      
programArgs.add(submitRequest.userJarFile.getName.dropRight(ConfigConst.PYTHON_SUFFIX.length))
     }
     programArgs.toList.asJava
   }

Reply via email to