[FLINK-8114][py] Fix forwarding of arguments
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0a4a677 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0a4a677 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0a4a677 Branch: refs/heads/master Commit: b0a4a67705c187920ac9151a2a0c6abe25b9488e Parents: 316fa1f Author: zentol <[email protected]> Authored: Mon Nov 20 15:07:32 2017 +0100 Committer: zentol <[email protected]> Committed: Mon Nov 20 15:39:04 2017 +0100 ---------------------------------------------------------------------- .../api/streaming/data/PythonStreamer.java | 3 +- .../flink/python/api/PythonPlanBinderTest.java | 36 ++++++++++++++++---- .../flink/python/api/args/multiple_args.py | 32 +++++++++++++++++ .../org/apache/flink/python/api/args/no_arg.py | 32 +++++++++++++++++ 4 files changed, 96 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b0a4a677/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java index 3fec947..864ea30 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java @@ -117,7 +117,8 @@ public class PythonStreamer<S extends PythonSender, OUT> implements Serializable String pythonBinaryPath = config.getString(PythonOptions.PYTHON_BINARY_PATH); - process = Runtime.getRuntime().exec(new String[] {pythonBinaryPath, "-O", "-B", planPath, config.getString(PLAN_ARGUMENTS_KEY, "")}); + String arguments = config.getString(PLAN_ARGUMENTS_KEY, ""); + process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B " + planPath + arguments); outPrinter = new Thread(new StreamPrinter(process.getInputStream())); outPrinter.start(); errorPrinter = new Thread(new StreamPrinter(process.getErrorStream(), msg)); http://git-wip-us.apache.org/repos/asf/flink/blob/b0a4a677/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java index 55cf1dc..92a985c 100644 --- a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java +++ b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java @@ -37,18 +37,19 @@ public class PythonPlanBinderTest extends JavaProgramTestBase { return true; } - private static String findUtilsFile() throws Exception { + private static Path getBaseTestPythonDir() { FileSystem fs = FileSystem.getLocalFileSystem(); - return fs.getWorkingDirectory().toString() - + "/src/test/python/org/apache/flink/python/api/utils/utils.py"; + return new Path(fs.getWorkingDirectory(), "src/test/python/org/apache/flink/python/api"); + } + + private static String findUtilsFile() throws Exception { + return new Path(getBaseTestPythonDir(), "utils/utils.py").toString(); } private static List<String> findTestFiles() throws Exception { List<String> files = new ArrayList<>(); FileSystem fs = FileSystem.getLocalFileSystem(); - FileStatus[] status = fs.listStatus( - new Path(fs.getWorkingDirectory().toString() - + "/src/test/python/org/apache/flink/python/api")); + FileStatus[] status = fs.listStatus(getBaseTestPythonDir()); for (FileStatus f : status) { String file = f.getPath().toString(); if (file.endsWith(".py")) { @@ -126,11 +127,13 @@ public class PythonPlanBinderTest extends JavaProgramTestBase { if (python2 != null) { log.info("Running python2 tests"); runTestPrograms(python2); + runArgvTestPrograms(python2); } String python3 = getPython3Path(); if (python3 != null) { log.info("Running python3 tests"); runTestPrograms(python3); + runArgvTestPrograms(python3); } } @@ -177,4 +180,25 @@ public class PythonPlanBinderTest extends JavaProgramTestBase { // we expect this exception to be thrown since no argument was passed } } + + private void runArgvTestPrograms(String pythonBinary) throws Exception { + log.info("Running runArgvTestPrograms."); + String utils = findUtilsFile(); + + { + String noArgTestPath = new Path(getBaseTestPythonDir(), "args/no_arg.py").toString(); + + Configuration configuration = new Configuration(); + configuration.setString(PythonOptions.PYTHON_BINARY_PATH, pythonBinary); + new PythonPlanBinder(configuration).runPlan(new String[]{noArgTestPath, utils}); + } + + { + String multiArgTestPath = new Path(getBaseTestPythonDir(), "args/multiple_args.py").toString(); + + Configuration configuration = new Configuration(); + configuration.setString(PythonOptions.PYTHON_BINARY_PATH, pythonBinary); + new PythonPlanBinder(configuration).runPlan(new String[]{multiArgTestPath, utils, "-", "hello", "world"}); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/b0a4a677/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/multiple_args.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/multiple_args.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/multiple_args.py new file mode 100644 index 0000000..57b44c3 --- /dev/null +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/multiple_args.py @@ -0,0 +1,32 @@ +# ############################################################################### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from flink.plan.Environment import get_environment +import sys +from utils import Verify + +if __name__ == "__main__": + env = get_environment() + + d1 = env.from_elements(len(sys.argv)) + + d1.map_partition(Verify([3], "MultipleArguments")).output() + + #Execution + env.set_parallelism(1) + + env.execute(local=True) http://git-wip-us.apache.org/repos/asf/flink/blob/b0a4a677/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/no_arg.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/no_arg.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/no_arg.py new file mode 100644 index 0000000..6afe7f2 --- /dev/null +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/no_arg.py @@ -0,0 +1,32 @@ +# ############################################################################### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from flink.plan.Environment import get_environment +import sys +from utils import Verify + +if __name__ == "__main__": + env = get_environment() + + d1 = env.from_elements(len(sys.argv)) + + d1.map_partition(Verify([1], "NoArgument")).output() + + #Execution + env.set_parallelism(1) + + env.execute(local=True)
