wangyang0918 commented on a change in pull request #13322:
URL: https://github.com/apache/flink/pull/13322#discussion_r490745232
##########
File path: docs/ops/deployment/native_kubernetes.md
##########
@@ -76,7 +76,12 @@ Please follow our [configuration guide]({{ site.baseurl
}}/ops/config.html) if y
If you do not specify a particular name for your session by
`kubernetes.cluster-id`, the Flink client will generate a UUID name.
+<span class="label label-info">Note</span> A docker image with Python and
PyFlink installed is required if you are going to start a session cluster for
Python Flink Jobs.
Review comment:
All the changes need to be synchronized with Chinese document. Please
create a new ticket or just do it in this PR.
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptionsUtils.java
##########
@@ -93,4 +98,40 @@ public static ProgramOptions
createPythonProgramOptions(CommandLine line) throws
"or not working as expected.", e);
}
}
+
+ public static ProgramOptions
createPythonApplicationProgramOptions(CommandLine line) throws CliArgsException,
Review comment:
From the code style guide, we should not break the exception line.
https://flink.apache.org/contributing/code-style-and-quality-formatting.html#breaking-the-lines-of-too-long-statements
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
##########
@@ -183,7 +183,13 @@ protected void runApplication(String[] args) throws
Exception {
final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine));
- final ProgramOptions programOptions = new
ProgramOptions(commandLine);
+ ProgramOptions programOptions;
Review comment:
`programOptions` could be `final` here. Please always try to define a
local variable as `final`.
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
##########
@@ -53,10 +56,10 @@
public enum PackagedProgramUtils {
;
- private static final String PYTHON_DRIVER_CLASS_NAME =
"org.apache.flink.client.python.PythonDriver";
-
private static final String PYTHON_GATEWAY_CLASS_NAME =
"org.apache.flink.client.python.PythonGatewayServer";
+ public static final String PYTHON_DRIVER_CLASS_NAME =
"org.apache.flink.client.python.PythonDriver";
Review comment:
Add a public static method `getPythonDriverClass` is better.
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptionsUtils.java
##########
@@ -93,4 +98,40 @@ public static ProgramOptions
createPythonProgramOptions(CommandLine line) throws
"or not working as expected.", e);
}
}
+
+ public static ProgramOptions
createPythonApplicationProgramOptions(CommandLine line) throws CliArgsException,
+ NoSuchFieldException, IllegalAccessException {
+ ProgramOptions pythonProgramOptions =
createPythonProgramOptions(line);
+ Field jarFilePath =
pythonProgramOptions.getClass().getSuperclass().getDeclaredField("jarFilePath");
+ jarFilePath.setAccessible(true);
+ // This is the python jar path in client, which is invalid at
runtime and it will be replaced with the actual
+ // path when retrieving the packaged program in the job manager
container.
+ String pythonJarPath = "local:///opt/flink/opt/" +
FilenameUtils.getName(PackagedProgramUtils.getPythonJar()
+ .getPath());
+ jarFilePath.set(pythonProgramOptions, pythonJarPath);
+ return pythonProgramOptions;
+ }
+
+ public static void configurePythonExecution(Configuration configuration,
Review comment:
Same as above.
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java
##########
@@ -111,6 +112,20 @@ private ClassPathPackagedProgramRetriever(
@Override
public PackagedProgram getPackagedProgram() throws FlinkException {
try {
+
+ // It is Python job if program arguments contain
"-py"/--python" or "-pym/--pyModule", set the fixed
+ // jobClassName and jarFile path.
+ if (PackagedProgramUtils.isPython(jobClassName) ||
PackagedProgramUtils.isPython(programArguments)){
+ String pythonJobClassName =
PackagedProgramUtils.PYTHON_DRIVER_CLASS_NAME;
+ File pythonJarFile = new
File(PackagedProgramUtils.getPythonJar().getPath());
Review comment:
Could we get a correct `pythonJarFile` for Yarn perjob/application mode
here? It seems that we do not ship the $FLINK_HOME/opt directory and set the
`FLINK_OPT_DIR` environment.
Maybe it is out of the scope of this PR.
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptionsUtils.java
##########
@@ -93,4 +98,40 @@ public static ProgramOptions
createPythonProgramOptions(CommandLine line) throws
"or not working as expected.", e);
}
}
+
+ public static ProgramOptions
createPythonApplicationProgramOptions(CommandLine line) throws CliArgsException,
+ NoSuchFieldException, IllegalAccessException {
+ ProgramOptions pythonProgramOptions =
createPythonProgramOptions(line);
+ Field jarFilePath =
pythonProgramOptions.getClass().getSuperclass().getDeclaredField("jarFilePath");
+ jarFilePath.setAccessible(true);
+ // This is the python jar path in client, which is invalid at
runtime and it will be replaced with the actual
+ // path when retrieving the packaged program in the job manager
container.
+ String pythonJarPath = "local:///opt/flink/opt/" +
FilenameUtils.getName(PackagedProgramUtils.getPythonJar()
+ .getPath());
+ jarFilePath.set(pythonProgramOptions, pythonJarPath);
+ return pythonProgramOptions;
+ }
+
+ public static void configurePythonExecution(Configuration configuration,
+
PackagedProgram packagedProgram) throws CliArgsException,
+ NoSuchFieldException, IllegalAccessException {
+
+ final Options commandOptions =
CliFrontendParser.getRunCommandOptions();
+ final CommandLine commandLine =
CliFrontendParser.parse(commandOptions, packagedProgram.getArguments(),
+ true);
+ final ProgramOptions programOptions =
createPythonProgramOptions(commandLine);
+
+ //Extract real program args by eliminating the PyFlink
dependency options
+ String[] programArgs =
programOptions.extractProgramArgs(commandLine);
+ //Set the real program args to the packaged program
+ Field argsField =
packagedProgram.getClass().getDeclaredField("args");
+ argsField.setAccessible(true);
+ argsField.set(packagedProgram, programArgs);
+
+ //PyFlink dependency configurations are set in the
pythonConfiguration when constructing the program option,
+ // we need to get the python configuration and merge
with the execution configuration.
+ Field pythonConfiguration =
programOptions.getClass().getDeclaredField("pythonConfiguration");
Review comment:
I am not sure whether adding a public setter for `pythonConfiguration`
is a better solution. Currently, it is really hard to understand that we are
trying to update a private field.
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptionsUtils.java
##########
@@ -93,4 +98,40 @@ public static ProgramOptions
createPythonProgramOptions(CommandLine line) throws
"or not working as expected.", e);
}
}
+
+ public static ProgramOptions
createPythonApplicationProgramOptions(CommandLine line) throws CliArgsException,
+ NoSuchFieldException, IllegalAccessException {
+ ProgramOptions pythonProgramOptions =
createPythonProgramOptions(line);
+ Field jarFilePath =
pythonProgramOptions.getClass().getSuperclass().getDeclaredField("jarFilePath");
+ jarFilePath.setAccessible(true);
+ // This is the python jar path in client, which is invalid at
runtime and it will be replaced with the actual
+ // path when retrieving the packaged program in the job manager
container.
+ String pythonJarPath = "local:///opt/flink/opt/" +
FilenameUtils.getName(PackagedProgramUtils.getPythonJar()
Review comment:
I am trying to understand this piece of codes. First, we need to inject
the `jarFilePath` filed of `ProgramOptions` since it is a private field. Then
we override it to `local:///opt/flink/opt/*python*.jar`. But i am afraid it
could not work for Yarn application mode. And I strongly suggest not to do this
temporary hack.
I will also put more consideration on this to find a proper solution.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]