wangyang0918 commented on a change in pull request #13322:
URL: https://github.com/apache/flink/pull/13322#discussion_r492421630
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
##########
@@ -183,15 +183,25 @@ protected void runApplication(String[] args) throws
Exception {
final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine));
- final ProgramOptions programOptions = new
ProgramOptions(commandLine);
-
final ApplicationDeployer deployer =
- new
ApplicationClusterDeployer(clusterClientServiceLoader);
+ new
ApplicationClusterDeployer(clusterClientServiceLoader);
- programOptions.validate();
- final URI uri =
PackagedProgramUtils.resolveURI(programOptions.getJarFilePath());
- final Configuration effectiveConfiguration =
getEffectiveConfiguration(
+ final ProgramOptions programOptions;
+ final Configuration effectiveConfiguration;
+
+ // No need to set a jarFile path for Pyflink job.
+ if (ProgramOptionsUtils.isPythonEntryPoint(commandLine)) {
+ programOptions =
ProgramOptionsUtils.createPythonProgramOptions(commandLine);
+ effectiveConfiguration = getEffectiveConfiguration(
+ activeCommandLine, commandLine, programOptions,
Collections.EMPTY_LIST);
Review comment:
Maybe we could use `Collections.emptyList()` to avoid unchecked warning
here.
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptionsUtils.java
##########
@@ -93,4 +98,40 @@ public static ProgramOptions
createPythonProgramOptions(CommandLine line) throws
"or not working as expected.", e);
}
}
+
+ public static ProgramOptions
createPythonApplicationProgramOptions(CommandLine line) throws CliArgsException,
+ NoSuchFieldException, IllegalAccessException {
+ ProgramOptions pythonProgramOptions =
createPythonProgramOptions(line);
+ Field jarFilePath =
pythonProgramOptions.getClass().getSuperclass().getDeclaredField("jarFilePath");
+ jarFilePath.setAccessible(true);
+ // This is the python jar path in client, which is invalid at
runtime and it will be replaced with the actual
+ // path when retrieving the packaged program in the job manager
container.
+ String pythonJarPath = "local:///opt/flink/opt/" +
FilenameUtils.getName(PackagedProgramUtils.getPythonJar()
+ .getPath());
+ jarFilePath.set(pythonProgramOptions, pythonJarPath);
+ return pythonProgramOptions;
+ }
+
+ public static void configurePythonExecution(Configuration configuration,
+
PackagedProgram packagedProgram) throws CliArgsException,
+ NoSuchFieldException, IllegalAccessException {
+
+ final Options commandOptions =
CliFrontendParser.getRunCommandOptions();
+ final CommandLine commandLine =
CliFrontendParser.parse(commandOptions, packagedProgram.getArguments(),
+ true);
+ final ProgramOptions programOptions =
createPythonProgramOptions(commandLine);
+
+ //Extract real program args by eliminating the PyFlink
dependency options
+ String[] programArgs =
programOptions.extractProgramArgs(commandLine);
+ //Set the real program args to the packaged program
+ Field argsField =
packagedProgram.getClass().getDeclaredField("args");
+ argsField.setAccessible(true);
+ argsField.set(packagedProgram, programArgs);
+
+ //PyFlink dependency configurations are set in the
pythonConfiguration when constructing the program option,
+ // we need to get the python configuration and merge
with the execution configuration.
+ Field pythonConfiguration =
programOptions.getClass().getDeclaredField("pythonConfiguration");
Review comment:
Even though, we have to do the reflection here, I strongly suggest to
add a unit test to cover this logic. It is really fragile.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]