Repository: flink Updated Branches: refs/heads/master e5ab4053b -> b0a4a6770
[FLINK-8109][py] Check for existence of plan/additional files Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/316fa1fd Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/316fa1fd Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/316fa1fd Branch: refs/heads/master Commit: 316fa1fd59b80f453db5900437ae553ae24a5966 Parents: 8ec3fce Author: zentol <[email protected]> Authored: Mon Nov 20 12:58:27 2017 +0100 Committer: zentol <[email protected]> Committed: Mon Nov 20 14:37:52 2017 +0100 ---------------------------------------------------------------------- .../flink/python/api/PythonPlanBinder.java | 21 +++++++- .../flink/python/api/PythonPlanBinderTest.java | 53 ++++++++++++++++---- 2 files changed, 61 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/316fa1fd/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java index b7adde1..e0c8215 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java @@ -50,6 +50,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.Arrays; import java.util.UUID; @@ -93,7 +94,12 @@ public class PythonPlanBinder { public static void main(String[] args) throws Exception { Configuration globalConfig = GlobalConfiguration.loadConfiguration(); PythonPlanBinder binder = new PythonPlanBinder(globalConfig); - binder.runPlan(args); + try { + binder.runPlan(args); + } catch (Exception e) { + System.out.println("Failed to run plan: " + e.getMessage()); + LOG.error("Failed to run plan.", e); + } } public PythonPlanBinder(Configuration globalConfig) { @@ -146,11 +152,22 @@ public class PythonPlanBinder { operatorConfig.setString(PLAN_ARGUMENTS_KEY, planArguments); + Path planPath = new Path(planFile); + if (!FileSystem.getUnguardedFileSystem(planPath.toUri()).exists(planPath)) { + throw new FileNotFoundException("Plan file " + planFile + " does not exist."); + } + for (String file : filesToCopy) { + Path filePath = new Path(file); + if (!FileSystem.getUnguardedFileSystem(filePath.toUri()).exists(filePath)) { + throw new FileNotFoundException("Additional file " + file + " does not exist."); + } + } + // copy flink library, plan file and additional files to temporary location Path tmpPlanFilesPath = new Path(tmpPlanFilesDir); deleteIfExists(tmpPlanFilesPath); FileCache.copy(new Path(pythonLibraryPath), tmpPlanFilesPath, false); - copyFile(new Path(planFile), tmpPlanFilesPath, FLINK_PYTHON_PLAN_NAME); + copyFile(planPath, tmpPlanFilesPath, FLINK_PYTHON_PLAN_NAME); for (String file : filesToCopy) { Path source = new Path(file); copyFile(source, tmpPlanFilesPath, source.getName()); http://git-wip-us.apache.org/repos/asf/flink/blob/316fa1fd/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java index 9e63091..55cf1dc 100644 --- a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java +++ b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java @@ -21,6 +21,7 @@ import org.apache.flink.python.api.streaming.data.PythonStreamer; import org.apache.flink.test.util.JavaProgramTestBase; import java.io.BufferedReader; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; import java.util.ArrayList; @@ -119,22 +120,52 @@ public class PythonPlanBinderTest extends JavaProgramTestBase { @Override protected void testProgram() throws Exception { testBoundCheck(); - String utils = findUtilsFile(); + testNotExistingPlanFile(); + testNotExistingAdditionalFile(); String python2 = getPython2Path(); if (python2 != null) { - for (String file : findTestFiles()) { - Configuration configuration = new Configuration(); - configuration.setString(PythonOptions.PYTHON_BINARY_PATH, python2); - new PythonPlanBinder(configuration).runPlan(new String[]{file, utils}); - } + log.info("Running python2 tests"); + runTestPrograms(python2); } String python3 = getPython3Path(); if (python3 != null) { - for (String file : findTestFiles()) { - Configuration configuration = new Configuration(); - configuration.setString(PythonOptions.PYTHON_BINARY_PATH, python3); - new PythonPlanBinder(configuration).runPlan(new String[]{file, utils}); - } + log.info("Running python3 tests"); + runTestPrograms(python3); + } + } + + private void runTestPrograms(String pythonBinary) throws Exception { + String utils = findUtilsFile(); + for (String file : findTestFiles()) { + log.info("Running file {}.", file); + Configuration configuration = new Configuration(); + configuration.setString(PythonOptions.PYTHON_BINARY_PATH, pythonBinary); + new PythonPlanBinder(configuration).runPlan(new String[]{file, utils}); + } + } + + private void testNotExistingPlanFile() throws Exception { + log.info("Running testNotExistingPlanFile."); + String utils = findUtilsFile(); + String nonExistingPlan = utils + "abc"; + Configuration configuration = new Configuration(); + try { + new PythonPlanBinder(configuration).runPlan(new String[]{nonExistingPlan}); + } catch (FileNotFoundException expected) { + // we expect this exception to be thrown since the plan file does not exist + } + } + + private void testNotExistingAdditionalFile() throws Exception { + log.info("Running testNotExistingAdditionalFile."); + String utils = findUtilsFile(); + String planFile = findTestFiles().iterator().next(); + String nonExistingLibrary = utils + "abc"; + Configuration configuration = new Configuration(); + try { + new PythonPlanBinder(configuration).runPlan(new String[]{planFile, utils, nonExistingLibrary}); + } catch (FileNotFoundException expected) { + // we expect this exception to be thrown since the plan file does not exist } }
