wangyang0918 commented on a change in pull request #13322:
URL: https://github.com/apache/flink/pull/13322#discussion_r490745232



##########
File path: docs/ops/deployment/native_kubernetes.md
##########
@@ -76,7 +76,12 @@ Please follow our [configuration guide]({{ site.baseurl 
}}/ops/config.html) if y
 
 If you do not specify a particular name for your session by 
`kubernetes.cluster-id`, the Flink client will generate a UUID name.
 
+<span class="label label-info">Note</span> A docker image with Python and 
PyFlink installed is required if you are going to start a session cluster for 
Python Flink Jobs.

Review comment:
       All the changes need to be synchronized with Chinese document. Please 
create a new ticket or just do it in this PR.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptionsUtils.java
##########
@@ -93,4 +98,40 @@ public static ProgramOptions 
createPythonProgramOptions(CommandLine line) throws
                                        "or not working as expected.", e);
                }
        }
+
+       public static ProgramOptions 
createPythonApplicationProgramOptions(CommandLine line) throws CliArgsException,

Review comment:
       From the code style guide, we should not break the exception line. 
https://flink.apache.org/contributing/code-style-and-quality-formatting.html#breaking-the-lines-of-too-long-statements
   
   

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
##########
@@ -183,7 +183,13 @@ protected void runApplication(String[] args) throws 
Exception {
                final CustomCommandLine activeCommandLine =
                                
validateAndGetActiveCommandLine(checkNotNull(commandLine));
 
-               final ProgramOptions programOptions = new 
ProgramOptions(commandLine);
+               ProgramOptions programOptions;

Review comment:
       `programOptions` could be `final` here. Please always try to define a 
local variable as `final`.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
##########
@@ -53,10 +56,10 @@
 public enum PackagedProgramUtils {
        ;
 
-       private static final String PYTHON_DRIVER_CLASS_NAME = 
"org.apache.flink.client.python.PythonDriver";
-
        private static final String PYTHON_GATEWAY_CLASS_NAME = 
"org.apache.flink.client.python.PythonGatewayServer";
 
+       public static final String PYTHON_DRIVER_CLASS_NAME = 
"org.apache.flink.client.python.PythonDriver";

Review comment:
       Add a public static method `getPythonDriverClass` is better.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptionsUtils.java
##########
@@ -93,4 +98,40 @@ public static ProgramOptions 
createPythonProgramOptions(CommandLine line) throws
                                        "or not working as expected.", e);
                }
        }
+
+       public static ProgramOptions 
createPythonApplicationProgramOptions(CommandLine line) throws CliArgsException,
+               NoSuchFieldException, IllegalAccessException {
+               ProgramOptions pythonProgramOptions = 
createPythonProgramOptions(line);
+               Field jarFilePath = 
pythonProgramOptions.getClass().getSuperclass().getDeclaredField("jarFilePath");
+               jarFilePath.setAccessible(true);
+               // This is the python jar path in client, which is invalid at 
runtime and it will be replaced with the actual
+               // path when retrieving the packaged program in the job manager 
container.
+               String pythonJarPath = "local:///opt/flink/opt/" + 
FilenameUtils.getName(PackagedProgramUtils.getPythonJar()
+                       .getPath());
+               jarFilePath.set(pythonProgramOptions, pythonJarPath);
+               return pythonProgramOptions;
+       }
+
+       public static void configurePythonExecution(Configuration configuration,

Review comment:
       Same as above.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java
##########
@@ -111,6 +112,20 @@ private ClassPathPackagedProgramRetriever(
        @Override
        public PackagedProgram getPackagedProgram() throws FlinkException {
                try {
+
+                       // It is Python job if program arguments contain 
"-py"/--python" or "-pym/--pyModule", set the fixed
+                       // jobClassName and jarFile path.
+                       if (PackagedProgramUtils.isPython(jobClassName) || 
PackagedProgramUtils.isPython(programArguments)){
+                               String pythonJobClassName = 
PackagedProgramUtils.PYTHON_DRIVER_CLASS_NAME;
+                               File pythonJarFile = new 
File(PackagedProgramUtils.getPythonJar().getPath());

Review comment:
       Could we get a correct `pythonJarFile` for Yarn perjob/application mode 
here? It seems that we do not ship the $FLINK_HOME/opt directory and set the 
`FLINK_OPT_DIR` environment.
   
   Maybe it is out of the scope of this PR.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptionsUtils.java
##########
@@ -93,4 +98,40 @@ public static ProgramOptions 
createPythonProgramOptions(CommandLine line) throws
                                        "or not working as expected.", e);
                }
        }
+
+       public static ProgramOptions 
createPythonApplicationProgramOptions(CommandLine line) throws CliArgsException,
+               NoSuchFieldException, IllegalAccessException {
+               ProgramOptions pythonProgramOptions = 
createPythonProgramOptions(line);
+               Field jarFilePath = 
pythonProgramOptions.getClass().getSuperclass().getDeclaredField("jarFilePath");
+               jarFilePath.setAccessible(true);
+               // This is the python jar path in client, which is invalid at 
runtime and it will be replaced with the actual
+               // path when retrieving the packaged program in the job manager 
container.
+               String pythonJarPath = "local:///opt/flink/opt/" + 
FilenameUtils.getName(PackagedProgramUtils.getPythonJar()
+                       .getPath());
+               jarFilePath.set(pythonProgramOptions, pythonJarPath);
+               return pythonProgramOptions;
+       }
+
+       public static void configurePythonExecution(Configuration configuration,
+                                                                               
                PackagedProgram packagedProgram) throws CliArgsException,
+               NoSuchFieldException, IllegalAccessException {
+
+                       final Options commandOptions = 
CliFrontendParser.getRunCommandOptions();
+                       final CommandLine commandLine = 
CliFrontendParser.parse(commandOptions, packagedProgram.getArguments(),
+                               true);
+                       final ProgramOptions programOptions = 
createPythonProgramOptions(commandLine);
+
+                       //Extract real program args by eliminating the PyFlink 
dependency options
+                       String[] programArgs = 
programOptions.extractProgramArgs(commandLine);
+                       //Set the real program args to the packaged program
+                       Field argsField = 
packagedProgram.getClass().getDeclaredField("args");
+                       argsField.setAccessible(true);
+                       argsField.set(packagedProgram, programArgs);
+
+                       //PyFlink dependency configurations are set in the 
pythonConfiguration when constructing the program option,
+                       // we need to get the python configuration and merge 
with the execution configuration.
+                       Field pythonConfiguration = 
programOptions.getClass().getDeclaredField("pythonConfiguration");

Review comment:
       I am not sure whether adding a public setter for `pythonConfiguration` 
is a better solution. Currently, it is really hard to understand that we are 
trying to update a private field.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptionsUtils.java
##########
@@ -93,4 +98,40 @@ public static ProgramOptions 
createPythonProgramOptions(CommandLine line) throws
                                        "or not working as expected.", e);
                }
        }
+
+       public static ProgramOptions 
createPythonApplicationProgramOptions(CommandLine line) throws CliArgsException,
+               NoSuchFieldException, IllegalAccessException {
+               ProgramOptions pythonProgramOptions = 
createPythonProgramOptions(line);
+               Field jarFilePath = 
pythonProgramOptions.getClass().getSuperclass().getDeclaredField("jarFilePath");
+               jarFilePath.setAccessible(true);
+               // This is the python jar path in client, which is invalid at 
runtime and it will be replaced with the actual
+               // path when retrieving the packaged program in the job manager 
container.
+               String pythonJarPath = "local:///opt/flink/opt/" + 
FilenameUtils.getName(PackagedProgramUtils.getPythonJar()

Review comment:
       I am trying to understand this piece of codes. First, we need to inject 
the `jarFilePath` filed of `ProgramOptions` since it is a private field. Then 
we override it to `local:///opt/flink/opt/*python*.jar`. But i am afraid it 
could not work for Yarn application mode. And I strongly suggest not to do this 
temporary hack.
   
   I will also put more consideration on this to find a proper solution.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to