This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch pyflink in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit f91b6eb63d3909ceea2c5e09fbf3b38674bae79e Author: benjobs <[email protected]> AuthorDate: Mon Sep 4 20:26:18 2023 +0800 [Improve] pyflink minor improvement --- .../flink/client/impl/YarnApplicationClient.scala | 7 ------- .../flink/client/trait/FlinkClientTrait.scala | 24 ++++++++++++++-------- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala index 8444aa2b7..374f44132 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala @@ -115,13 +115,6 @@ object YarnApplicationClient extends YarnClientTrait { .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, ConfigConst.PYTHON_EXECUTABLE) // python.executable .safeSet(PythonOptions.PYTHON_EXECUTABLE, ConfigConst.PYTHON_EXECUTABLE) - - val args: util.List[String] = flinkConfig.get(ApplicationConfiguration.APPLICATION_ARGS) - // Caused by: java.lang.UnsupportedOperationException - val argsList: util.ArrayList[String] = new util.ArrayList[String](args) - argsList.add("-pym") - argsList.add(submitRequest.userJarFile.getName.dropRight(ConfigConst.PYTHON_SUFFIX.length)) - flinkConfig.safeSet(ApplicationConfiguration.APPLICATION_ARGS, argsList) } logInfo(s""" diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index 59aa46cc3..6399aff47 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -249,7 +249,12 @@ trait FlinkClientTrait extends Logger { .safeSet(PythonOptions.PYTHON_EXECUTABLE, ConfigConst.PYTHON_EXECUTABLE) } - val packageProgram = PackagedProgram.newBuilder + val programBuilder = PackagedProgram.newBuilder + // jar File + if (submitRequest.developmentMode != DevelopmentMode.PYFLINK) { + programBuilder.setJarFile(submitRequest.userJarFile) + } + programBuilder .setArguments( flinkConfig .getOptional(ApplicationConfiguration.APPLICATION_ARGS) @@ -258,8 +263,8 @@ trait FlinkClientTrait extends Logger { flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get() ) .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings) - .build() + val packageProgram = programBuilder.build() val jobGraph = PackagedProgramUtils.createJobGraph( packageProgram, flinkConfig, @@ -506,13 +511,14 @@ trait FlinkClientTrait extends Logger { } } - if ( - submitRequest.developmentMode == DevelopmentMode.PYFLINK && !submitRequest.executionMode - .equals(ExecutionMode.YARN_APPLICATION) - ) { - // python file - programArgs.add("-py") - programArgs.add(submitRequest.userJarFile.getAbsolutePath) + if (submitRequest.developmentMode == DevelopmentMode.PYFLINK) { + if (submitRequest.executionMode != ExecutionMode.YARN_APPLICATION) { + // python file + programArgs.add("-py") + programArgs.add(submitRequest.userJarFile.getAbsolutePath) + } + programArgs.add("-pym") + programArgs.add(submitRequest.userJarFile.getName.dropRight(ConfigConst.PYTHON_SUFFIX.length)) } programArgs.toList.asJava }
