Repository: flink
Updated Branches:
  refs/heads/master e5ab4053b -> b0a4a6770


[FLINK-8109][py] Check for existence of plan/additional files


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/316fa1fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/316fa1fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/316fa1fd

Branch: refs/heads/master
Commit: 316fa1fd59b80f453db5900437ae553ae24a5966
Parents: 8ec3fce
Author: zentol <[email protected]>
Authored: Mon Nov 20 12:58:27 2017 +0100
Committer: zentol <[email protected]>
Committed: Mon Nov 20 14:37:52 2017 +0100

----------------------------------------------------------------------
 .../flink/python/api/PythonPlanBinder.java      | 21 +++++++-
 .../flink/python/api/PythonPlanBinderTest.java  | 53 ++++++++++++++++----
 2 files changed, 61 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/316fa1fd/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index b7adde1..e0c8215 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -50,6 +50,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.UUID;
@@ -93,7 +94,12 @@ public class PythonPlanBinder {
        public static void main(String[] args) throws Exception {
                Configuration globalConfig = 
GlobalConfiguration.loadConfiguration();
                PythonPlanBinder binder = new PythonPlanBinder(globalConfig);
-               binder.runPlan(args);
+               try {
+                       binder.runPlan(args);
+               } catch (Exception e) {
+                       System.out.println("Failed to run plan: " + 
e.getMessage());
+                       LOG.error("Failed to run plan.", e);
+               }
        }
 
        public PythonPlanBinder(Configuration globalConfig) {
@@ -146,11 +152,22 @@ public class PythonPlanBinder {
 
                        operatorConfig.setString(PLAN_ARGUMENTS_KEY, 
planArguments);
 
+                       Path planPath = new Path(planFile);
+                       if 
(!FileSystem.getUnguardedFileSystem(planPath.toUri()).exists(planPath)) {
+                               throw new FileNotFoundException("Plan file " + 
planFile + " does not exist.");
+                       }
+                       for (String file : filesToCopy) {
+                               Path filePath = new Path(file);
+                               if 
(!FileSystem.getUnguardedFileSystem(filePath.toUri()).exists(filePath)) {
+                                       throw new 
FileNotFoundException("Additional file " + file + " does not exist.");
+                               }
+                       }
+
                        // copy flink library, plan file and additional files 
to temporary location
                        Path tmpPlanFilesPath = new Path(tmpPlanFilesDir);
                        deleteIfExists(tmpPlanFilesPath);
                        FileCache.copy(new Path(pythonLibraryPath), 
tmpPlanFilesPath, false);
-                       copyFile(new Path(planFile), tmpPlanFilesPath, 
FLINK_PYTHON_PLAN_NAME);
+                       copyFile(planPath, tmpPlanFilesPath, 
FLINK_PYTHON_PLAN_NAME);
                        for (String file : filesToCopy) {
                                Path source = new Path(file);
                                copyFile(source, tmpPlanFilesPath, 
source.getName());

http://git-wip-us.apache.org/repos/asf/flink/blob/316fa1fd/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
 
b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
index 9e63091..55cf1dc 100644
--- 
a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
+++ 
b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
@@ -21,6 +21,7 @@ import 
org.apache.flink.python.api.streaming.data.PythonStreamer;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
 import java.io.BufferedReader;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.ArrayList;
@@ -119,22 +120,52 @@ public class PythonPlanBinderTest extends 
JavaProgramTestBase {
        @Override
        protected void testProgram() throws Exception {
                testBoundCheck();
-               String utils = findUtilsFile();
+               testNotExistingPlanFile();
+               testNotExistingAdditionalFile();
                String python2 = getPython2Path();
                if (python2 != null) {
-                       for (String file : findTestFiles()) {
-                               Configuration configuration = new 
Configuration();
-                               
configuration.setString(PythonOptions.PYTHON_BINARY_PATH, python2);
-                               new PythonPlanBinder(configuration).runPlan(new 
String[]{file, utils});
-                       }
+                       log.info("Running python2 tests");
+                       runTestPrograms(python2);
                }
                String python3 = getPython3Path();
                if (python3 != null) {
-                       for (String file : findTestFiles()) {
-                               Configuration configuration = new 
Configuration();
-                               
configuration.setString(PythonOptions.PYTHON_BINARY_PATH, python3);
-                               new PythonPlanBinder(configuration).runPlan(new 
String[]{file, utils});
-                       }
+                       log.info("Running python3 tests");
+                       runTestPrograms(python3);
+               }
+       }
+
+       private void runTestPrograms(String pythonBinary) throws Exception {
+               String utils = findUtilsFile();
+               for (String file : findTestFiles()) {
+                       log.info("Running file {}.", file);
+                       Configuration configuration = new Configuration();
+                       
configuration.setString(PythonOptions.PYTHON_BINARY_PATH, pythonBinary);
+                       new PythonPlanBinder(configuration).runPlan(new 
String[]{file, utils});
+               }
+       }
+
+       private void testNotExistingPlanFile() throws Exception {
+               log.info("Running testNotExistingPlanFile.");
+               String utils = findUtilsFile();
+               String nonExistingPlan = utils + "abc";
+               Configuration configuration = new Configuration();
+               try {
+                       new PythonPlanBinder(configuration).runPlan(new 
String[]{nonExistingPlan});
+               } catch (FileNotFoundException expected) {
+                       // we expect this exception to be thrown since the plan 
file does not exist
+               }
+       }
+
+       private void testNotExistingAdditionalFile() throws Exception {
+               log.info("Running testNotExistingAdditionalFile.");
+               String utils = findUtilsFile();
+               String planFile = findTestFiles().iterator().next();
+               String nonExistingLibrary = utils + "abc";
+               Configuration configuration = new Configuration();
+               try {
+                       new PythonPlanBinder(configuration).runPlan(new 
String[]{planFile, utils, nonExistingLibrary});
+               } catch (FileNotFoundException expected) {
+                       // we expect this exception to be thrown since the plan 
file does not exist
                }
        }
 

Reply via email to