[FLINK-4002] [py] Improve testing infrastructure This closes #2063
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/76dcbd45 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/76dcbd45 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/76dcbd45 Branch: refs/heads/master Commit: 76dcbd458817566166022c400c00df972043453e Parents: e90ea9c Author: omaralvarez <[email protected]> Authored: Fri Jun 10 14:06:03 2016 +0200 Committer: zentol <[email protected]> Committed: Fri Jun 10 14:07:41 2016 +0200 ---------------------------------------------------------------------- .../flink/python/api/PythonPlanBinderTest.java | 11 ++- .../org/apache/flink/python/api/test_main.py | 38 +-------- .../org/apache/flink/python/api/test_main2.py | 40 +--------- .../org/apache/flink/python/api/utils/utils.py | 81 ++++++++++++++++++++ 4 files changed, 92 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/76dcbd45/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java index 244e6b7..c53d408 100644 --- a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java +++ b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java @@ -28,6 +28,12 @@ public class PythonPlanBinderTest extends JavaProgramTestBase { return true; } + private static String findUtilsFile() throws Exception { + FileSystem fs = FileSystem.getLocalFileSystem(); + return fs.getWorkingDirectory().toString() + + "/src/test/python/org/apache/flink/python/api/utils/utils.py"; + } + private static List<String> findTestFiles() throws Exception { List<String> files = new ArrayList(); FileSystem fs = FileSystem.getLocalFileSystem(); @@ -63,14 +69,15 @@ public class PythonPlanBinderTest extends JavaProgramTestBase { @Override protected void testProgram() throws Exception { + String utils = findUtilsFile(); if (isPython2Supported()) { for (String file : findTestFiles()) { - PythonPlanBinder.main(new String[]{ARGUMENT_PYTHON_2, file}); + PythonPlanBinder.main(new String[]{ARGUMENT_PYTHON_2, file, utils}); } } if (isPython3Supported()) { for (String file : findTestFiles()) { - PythonPlanBinder.main(new String[]{ARGUMENT_PYTHON_3, file}); + PythonPlanBinder.main(new String[]{ARGUMENT_PYTHON_3, file, utils}); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/76dcbd45/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py index 9b0f144..c0a4414 100644 --- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py @@ -25,43 +25,7 @@ from flink.functions.GroupReduceFunction import GroupReduceFunction from flink.plan.Constants import Order, WriteMode from flink.plan.Constants import INT, STRING import struct - -#Utilities -class Id(MapFunction): - def map(self, value): - return value - - -class Verify(MapPartitionFunction): - def __init__(self, expected, name): - super(Verify, self).__init__() - self.expected = expected - self.name = name - - def map_partition(self, iterator, collector): - index = 0 - for value in iterator: - if value != self.expected[index]: - raise Exception(self.name + " Test failed. Expected: " + str(self.expected[index]) + " Actual: " + str(value)) - index += 1 - #collector.collect(self.name + " successful!") - - -class Verify2(MapPartitionFunction): - def __init__(self, expected, name): - super(Verify2, self).__init__() - self.expected = expected - self.name = name - - def map_partition(self, iterator, collector): - for value in iterator: - if value in self.expected: - try: - self.expected.remove(value) - except Exception: - raise Exception(self.name + " failed! Actual value " + str(value) + "not contained in expected values: "+str(self.expected)) - #collector.collect(self.name + " successful!") - +from utils import Id, Verify if __name__ == "__main__": env = get_environment() http://git-wip-us.apache.org/repos/asf/flink/blob/76dcbd45/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py index 787928c..2ea6f91 100644 --- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py @@ -18,48 +18,10 @@ ################################################################################ from flink.plan.Environment import get_environment from flink.functions.MapFunction import MapFunction -from flink.functions.MapPartitionFunction import MapPartitionFunction from flink.functions.CrossFunction import CrossFunction from flink.functions.JoinFunction import JoinFunction from flink.functions.CoGroupFunction import CoGroupFunction - - -#Utilities -class Id(MapFunction): - def map(self, value): - return value - - -class Verify(MapPartitionFunction): - def __init__(self, expected, name): - super(Verify, self).__init__() - self.expected = expected - self.name = name - - def map_partition(self, iterator, collector): - index = 0 - for value in iterator: - if value != self.expected[index]: - raise Exception(self.name + " Test failed. Expected: " + str(self.expected[index]) + " Actual: " + str(value)) - index += 1 - #collector.collect(self.name + " successful!") - - -class Verify2(MapPartitionFunction): - def __init__(self, expected, name): - super(Verify2, self).__init__() - self.expected = expected - self.name = name - - def map_partition(self, iterator, collector): - for value in iterator: - if value in self.expected: - try: - self.expected.remove(value) - except Exception: - raise Exception(self.name + " failed! Actual value " + str(value) + "not contained in expected values: "+str(self.expected)) - #collector.collect(self.name + " successful!") - +from utils import Verify, Verify2 if __name__ == "__main__": env = get_environment() http://git-wip-us.apache.org/repos/asf/flink/blob/76dcbd45/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/utils/utils.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/utils/utils.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/utils/utils.py new file mode 100644 index 0000000..78999b1 --- /dev/null +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/utils/utils.py @@ -0,0 +1,81 @@ +# ############################################################################### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from flink.functions.MapFunction import MapFunction +from flink.functions.MapPartitionFunction import MapPartitionFunction + + +class Id(MapFunction): + def map(self, value): + """ + Simple map function to forward test results. + + :param value: Input value. + :return: Forwarded value. + """ + return value + + +class Verify(MapPartitionFunction): + def __init__(self, expected, name): + super(Verify, self).__init__() + self.expected = expected + self.name = name + + def map_partition(self, iterator, collector): + """ + Compares elements in the expected values list against actual values in resulting DataSet. + + :param iterator: Iterator for the corresponding DataSet partition. + :param collector: Collector for the result records. + """ + index = 0 + for value in iterator: + try: + if value != self.expected[index]: + raise Exception(self.name + " Test failed. Expected: " + str(self.expected[index]) + " Actual: " + str(value)) + except IndexError: + raise Exception(self.name + " Test failed. Discrepancy in the number of elements between expected and actual values.") + index += 1 + if len(self.expected) != index: + raise Exception(self.name + " Test failed. Discrepancy in the number of elements between expected and actual values.") + #collector.collect(self.name + " successful!") + + +class Verify2(MapPartitionFunction): + def __init__(self, expected, name): + super(Verify2, self).__init__() + self.expected = expected + self.name = name + + def map_partition(self, iterator, collector): + """ + Compares elements in the expected values list against actual values in resulting DataSet. + + This function does not compare element by element, since for example in a Union order is not guaranteed. + + Elements are removed from the expected values list for the whole DataSet. + + :param iterator: Iterator for the corresponding DataSet partition. + :param collector: Collector for the result records. + """ + for value in iterator: + try: + self.expected.remove(value) + except Exception: + raise Exception(self.name + " failed! Actual value " + str(value) + "not contained in expected values: " + str(self.expected)) + #collector.collect(self.name + " successful!")
