dianfu commented on a change in pull request #8472: [FLINK-12327][python] Adds 
support to submit Python Table API job in CliFrontend
URL: https://github.com/apache/flink/pull/8472#discussion_r284995698
 
 

 ##########
 File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
 ##########
 @@ -771,22 +773,31 @@ PackagedProgram buildProgram(ProgramOptions options) 
throws FileNotFoundExceptio
                String jarFilePath = options.getJarFilePath();
                List<URL> classpaths = options.getClasspaths();
 
-               if (jarFilePath == null) {
-                       throw new IllegalArgumentException("The program JAR 
file was not specified.");
+               // The job can be distinguished. i.e. python or java.
+               if (!options.isDistinguishedJob()) {
+                       throw new IllegalArgumentException("The program should 
be specified a JAR file " +
+                               "or a python file(or module)");
                }
 
-               File jarFile = new File(jarFilePath);
+               File jarFile = null;
+               // If the job is java job, it should be checked whether jar 
file exists.
+               if (options.isJava()) {
+                       jarFile = new File(jarFilePath);
 
-               // Check if JAR file exists
-               if (!jarFile.exists()) {
-                       throw new FileNotFoundException("JAR file does not 
exist: " + jarFile);
-               }
-               else if (!jarFile.isFile()) {
-                       throw new FileNotFoundException("JAR file is not a 
file: " + jarFile);
+                       // Check if JAR file exists
+                       if (!jarFile.exists()) {
+                               throw new FileNotFoundException("JAR file does 
not exist: " + jarFile);
+                       } else if (!jarFile.isFile()) {
+                               throw new FileNotFoundException("JAR file is 
not a file: " + jarFile);
+                       }
                }
 
                // Get assembler class
                String entryPointClass = options.getEntryPointClassName();
+               // If the job is a python job, the entry point class is 
PythonDriver.
+               if (entryPointClass == null && options.isPython()) {
+                       entryPointClass = 
"org.apache.flink.client.python.PythonDriver";
 
 Review comment:
   PythonDriver.class.getCanonicalName()

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to