[FLINK-8114][py] Fix forwarding of arguments

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0a4a677
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0a4a677
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0a4a677

Branch: refs/heads/master
Commit: b0a4a67705c187920ac9151a2a0c6abe25b9488e
Parents: 316fa1f
Author: zentol <[email protected]>
Authored: Mon Nov 20 15:07:32 2017 +0100
Committer: zentol <[email protected]>
Committed: Mon Nov 20 15:39:04 2017 +0100

----------------------------------------------------------------------
 .../api/streaming/data/PythonStreamer.java      |  3 +-
 .../flink/python/api/PythonPlanBinderTest.java  | 36 ++++++++++++++++----
 .../flink/python/api/args/multiple_args.py      | 32 +++++++++++++++++
 .../org/apache/flink/python/api/args/no_arg.py  | 32 +++++++++++++++++
 4 files changed, 96 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b0a4a677/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index 3fec947..864ea30 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -117,7 +117,8 @@ public class PythonStreamer<S extends PythonSender, OUT> 
implements Serializable
 
                String pythonBinaryPath = 
config.getString(PythonOptions.PYTHON_BINARY_PATH);
 
-               process = Runtime.getRuntime().exec(new String[] 
{pythonBinaryPath, "-O", "-B", planPath, config.getString(PLAN_ARGUMENTS_KEY, 
"")});
+               String arguments = config.getString(PLAN_ARGUMENTS_KEY, "");
+               process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B 
" + planPath + arguments);
                outPrinter = new Thread(new 
StreamPrinter(process.getInputStream()));
                outPrinter.start();
                errorPrinter = new Thread(new 
StreamPrinter(process.getErrorStream(), msg));

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a4a677/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
 
b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
index 55cf1dc..92a985c 100644
--- 
a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
+++ 
b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
@@ -37,18 +37,19 @@ public class PythonPlanBinderTest extends 
JavaProgramTestBase {
                return true;
        }
 
-       private static String findUtilsFile() throws Exception {
+       private static Path getBaseTestPythonDir() {
                FileSystem fs = FileSystem.getLocalFileSystem();
-               return fs.getWorkingDirectory().toString()
-                               + 
"/src/test/python/org/apache/flink/python/api/utils/utils.py";
+               return new Path(fs.getWorkingDirectory(), 
"src/test/python/org/apache/flink/python/api");
+       }
+
+       private static String findUtilsFile() throws Exception {
+               return new Path(getBaseTestPythonDir(), 
"utils/utils.py").toString();
        }
 
        private static List<String> findTestFiles() throws Exception {
                List<String> files = new ArrayList<>();
                FileSystem fs = FileSystem.getLocalFileSystem();
-               FileStatus[] status = fs.listStatus(
-                               new Path(fs.getWorkingDirectory().toString()
-                                               + 
"/src/test/python/org/apache/flink/python/api"));
+               FileStatus[] status = fs.listStatus(getBaseTestPythonDir());
                for (FileStatus f : status) {
                        String file = f.getPath().toString();
                        if (file.endsWith(".py")) {
@@ -126,11 +127,13 @@ public class PythonPlanBinderTest extends 
JavaProgramTestBase {
                if (python2 != null) {
                        log.info("Running python2 tests");
                        runTestPrograms(python2);
+                       runArgvTestPrograms(python2);
                }
                String python3 = getPython3Path();
                if (python3 != null) {
                        log.info("Running python3 tests");
                        runTestPrograms(python3);
+                       runArgvTestPrograms(python3);
                }
        }
 
@@ -177,4 +180,25 @@ public class PythonPlanBinderTest extends 
JavaProgramTestBase {
                        // we expect this exception to be thrown since no 
argument was passed
                }
        }
+
+       private void runArgvTestPrograms(String pythonBinary) throws Exception {
+               log.info("Running runArgvTestPrograms.");
+               String utils = findUtilsFile();
+
+               {
+                       String noArgTestPath = new Path(getBaseTestPythonDir(), 
"args/no_arg.py").toString();
+
+                       Configuration configuration = new Configuration();
+                       
configuration.setString(PythonOptions.PYTHON_BINARY_PATH, pythonBinary);
+                       new PythonPlanBinder(configuration).runPlan(new 
String[]{noArgTestPath, utils});
+               }
+
+               {
+                       String multiArgTestPath = new 
Path(getBaseTestPythonDir(), "args/multiple_args.py").toString();
+
+                       Configuration configuration = new Configuration();
+                       
configuration.setString(PythonOptions.PYTHON_BINARY_PATH, pythonBinary);
+                       new PythonPlanBinder(configuration).runPlan(new 
String[]{multiArgTestPath, utils, "-", "hello", "world"});
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a4a677/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/multiple_args.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/multiple_args.py
 
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/multiple_args.py
new file mode 100644
index 0000000..57b44c3
--- /dev/null
+++ 
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/multiple_args.py
@@ -0,0 +1,32 @@
+# 
###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+import sys
+from utils import Verify
+
+if __name__ == "__main__":
+    env = get_environment()
+
+    d1 = env.from_elements(len(sys.argv))
+
+    d1.map_partition(Verify([3], "MultipleArguments")).output()
+
+    #Execution
+    env.set_parallelism(1)
+
+    env.execute(local=True)

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a4a677/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/no_arg.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/no_arg.py
 
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/no_arg.py
new file mode 100644
index 0000000..6afe7f2
--- /dev/null
+++ 
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/args/no_arg.py
@@ -0,0 +1,32 @@
+# 
###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+import sys
+from utils import Verify
+
+if __name__ == "__main__":
+    env = get_environment()
+
+    d1 = env.from_elements(len(sys.argv))
+    
+    d1.map_partition(Verify([1], "NoArgument")).output()
+
+    #Execution
+    env.set_parallelism(1)
+
+    env.execute(local=True)

Reply via email to