Repository: flink Updated Branches: refs/heads/test_1945_pytest [created] c1dd9604d
[FLINK-1945][py] Python Tests less verbose This closes #1376 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c1dd9604 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c1dd9604 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c1dd9604 Branch: refs/heads/test_1945_pytest Commit: c1dd9604d4c5c15917f920819957037c2f49b228 Parents: 40cb4e0 Author: zentol <ches...@apache.org> Authored: Wed Nov 18 12:52:42 2015 +0100 Committer: zentol <ches...@apache.org> Committed: Thu Nov 19 13:23:16 2015 +0100 ---------------------------------------------------------------------- flink-libraries/flink-python/pom.xml | 6 + .../flink/python/api/PythonPlanBinderTest.java | 64 ++-- .../org/apache/flink/python/api/test_csv.py | 31 -- .../org/apache/flink/python/api/test_main.py | 304 ++++++++----------- .../org/apache/flink/python/api/test_main2.py | 145 +++++++++ .../org/apache/flink/python/api/test_text.py | 30 -- .../flink/python/api/test_type_deduction.py | 35 ++- .../org/apache/flink/python/api/test_types.py | 70 ----- 8 files changed, 323 insertions(+), 362 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c1dd9604/flink-libraries/flink-python/pom.xml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/pom.xml b/flink-libraries/flink-python/pom.xml index 747ff40..56dee7e 100644 --- a/flink-libraries/flink-python/pom.xml +++ b/flink-libraries/flink-python/pom.xml @@ -77,5 +77,11 @@ under the License. <artifactId>flink-clients</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/flink/blob/c1dd9604/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java index 697731f..244e6b7 100644 --- a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java +++ b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java @@ -20,26 +20,16 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import static org.apache.flink.python.api.PythonPlanBinder.ARGUMENT_PYTHON_2; import static org.apache.flink.python.api.PythonPlanBinder.ARGUMENT_PYTHON_3; -import org.junit.Test; -import org.junit.BeforeClass; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.flink.test.util.JavaProgramTestBase; -public class PythonPlanBinderTest { - private static final Logger LOG = LoggerFactory.getLogger(PythonPlanBinder.class); - - private static boolean python2Supported = true; - private static boolean python3Supported = true; - private static List<String> TEST_FILES; - - @BeforeClass - public static void setup() throws Exception { - findTestFiles(); - checkPythonSupport(); +public class PythonPlanBinderTest extends JavaProgramTestBase { + @Override + protected boolean skipCollectionExecution() { + return true; } - - private static void findTestFiles() throws Exception { - TEST_FILES = new ArrayList(); + + private static List<String> findTestFiles() throws Exception { + List<String> files = new ArrayList(); FileSystem fs = FileSystem.getLocalFileSystem(); FileStatus[] status = fs.listStatus( new Path(fs.getWorkingDirectory().toString() @@ -47,41 +37,39 @@ public class PythonPlanBinderTest { for (FileStatus f : status) { String file = f.getPath().toString(); if (file.endsWith(".py")) { - TEST_FILES.add(file); + files.add(file); } } + return files; } - - private static void checkPythonSupport() { + + private static boolean isPython2Supported() { try { Runtime.getRuntime().exec("python"); + return true; } catch (IOException ex) { - python2Supported = false; - LOG.info("No Python 2 runtime detected."); + return false; } + } + + private static boolean isPython3Supported() { try { Runtime.getRuntime().exec("python3"); + return true; } catch (IOException ex) { - python3Supported = false; - LOG.info("No Python 3 runtime detected."); + return false; } } - - @Test - public void testPython2() throws Exception { - if (python2Supported) { - for (String file : TEST_FILES) { - LOG.info("testing " + file); + + @Override + protected void testProgram() throws Exception { + if (isPython2Supported()) { + for (String file : findTestFiles()) { PythonPlanBinder.main(new String[]{ARGUMENT_PYTHON_2, file}); } } - } - - @Test - public void testPython3() throws Exception { - if (python3Supported) { - for (String file : TEST_FILES) { - LOG.info("testing " + file); + if (isPython3Supported()) { + for (String file : findTestFiles()) { PythonPlanBinder.main(new String[]{ARGUMENT_PYTHON_3, file}); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c1dd9604/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_csv.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_csv.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_csv.py deleted file mode 100644 index 62b6a1d..0000000 --- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_csv.py +++ /dev/null @@ -1,31 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ -from flink.plan.Environment import get_environment -from flink.plan.Constants import INT, STRING -from flink.plan.Constants import WriteMode - -if __name__ == "__main__": - env = get_environment() - - d1 = env.read_csv("src/test/python/org/apache/flink/python/api/data_csv", (INT, INT, STRING)) - - d1.write_csv("/tmp/flink/result", line_delimiter="\n", field_delimiter="|", write_mode=WriteMode.OVERWRITE) - - env.set_degree_of_parallelism(1) - - env.execute(local=True) http://git-wip-us.apache.org/repos/asf/flink/blob/c1dd9604/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py index 2666945..9a3a5e4 100644 --- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py @@ -21,107 +21,11 @@ from flink.functions.FlatMapFunction import FlatMapFunction from flink.functions.FilterFunction import FilterFunction from flink.functions.MapPartitionFunction import MapPartitionFunction from flink.functions.ReduceFunction import ReduceFunction -from flink.functions.CrossFunction import CrossFunction -from flink.functions.JoinFunction import JoinFunction from flink.functions.GroupReduceFunction import GroupReduceFunction -from flink.functions.CoGroupFunction import CoGroupFunction -from flink.plan.Constants import INT, STRING, FLOAT, BOOL, CUSTOM, Order +from flink.plan.Constants import INT, STRING, FLOAT, BOOL, BYTES, CUSTOM, Order, WriteMode import struct - -class Mapper(MapFunction): - def map(self, value): - return value * value - - -class Filter(FilterFunction): - def __init__(self, limit): - super(Filter, self).__init__() - self.limit = limit - - def filter(self, value): - return value > self.limit - - -class FlatMap(FlatMapFunction): - def flat_map(self, value, collector): - collector.collect(value) - collector.collect(value * 2) - - -class MapPartition(MapPartitionFunction): - def map_partition(self, iterator, collector): - for value in iterator: - collector.collect(value * 2) - - -class Reduce(ReduceFunction): - def reduce(self, value1, value2): - return value1 + value2 - - -class Reduce2(ReduceFunction): - def reduce(self, value1, value2): - return (value1[0] + value2[0], value1[1] + value2[1], value1[2], value1[3] or value2[3]) - - -class Cross(CrossFunction): - def cross(self, value1, value2): - return (value1, value2[3]) - - -class MapperBcv(MapFunction): - def map(self, value): - factor = self.context.get_broadcast_variable("test")[0][0] - return value * factor - - -class Join(JoinFunction): - def join(self, value1, value2): - if value1[3]: - return value2[0] + str(value1[0]) - else: - return value2[0] + str(value1[1]) - - -class GroupReduce(GroupReduceFunction): - def reduce(self, iterator, collector): - if iterator.has_next(): - i, f, s, b = iterator.next() - for value in iterator: - i += value[0] - f += value[1] - b |= value[3] - collector.collect((i, f, s, b)) - - -class GroupReduce2(GroupReduceFunction): - def reduce(self, iterator, collector): - for value in iterator: - collector.collect(value) - - -class GroupReduce3(GroupReduceFunction): - def reduce(self, iterator, collector): - collector.collect(iterator.next()) - - def combine(self, iterator, collector): - if iterator.has_next(): - v1 = iterator.next() - if iterator.has_next(): - v2 = iterator.next() - if v1[0] < v2[0]: - collector.collect(v1) - else: - collector.collect(v2) - - -class CoGroup(CoGroupFunction): - def co_group(self, iterator1, iterator2, collector): - while iterator1.has_next() and iterator2.has_next(): - collector.collect((iterator1.next(), iterator2.next())) - - +#Utilities class Id(MapFunction): def map(self, value): return value @@ -137,10 +41,9 @@ class Verify(MapPartitionFunction): index = 0 for value in iterator: if value != self.expected[index]: - print(self.name + " Test failed. Expected: " + str(self.expected[index]) + " Actual: " + str(value)) - raise Exception(self.name + " failed!") + raise Exception(self.name + " Test failed. Expected: " + str(self.expected[index]) + " Actual: " + str(value)) index += 1 - collector.collect(self.name + " successful!") + #collector.collect(self.name + " successful!") class Verify2(MapPartitionFunction): @@ -155,8 +58,8 @@ class Verify2(MapPartitionFunction): try: self.expected.remove(value) except Exception: - raise Exception(self.name + " failed!") - collector.collect(self.name + " successful!") + raise Exception(self.name + " failed! Actual value " + str(value) + "not contained in expected values: "+str(self.expected)) + #collector.collect(self.name + " successful!") if __name__ == "__main__": @@ -172,93 +75,33 @@ if __name__ == "__main__": d5 = env.from_elements((4.4, 4.3, 1), (4.3, 4.4, 1), (4.2, 4.1, 3), (4.1, 4.1, 3)) - d1 \ - .map((lambda x: x * x), INT).map(Mapper(), INT) \ - .map_partition(Verify([1, 1296, 20736], "Map"), STRING).output() + d6 = env.from_elements(1, 1, 12) - d1 \ - .map(Mapper(), INT).map((lambda x: x * x), INT) \ - .map_partition(Verify([1, 1296, 20736], "Chained Lambda"), STRING).output() + #CSV Source/Sink + csv_data = env.read_csv("src/test/python/org/apache/flink/python/api/data_csv", (INT, INT, STRING)) - d1 \ - .filter(Filter(5)).filter(Filter(8)) \ - .map_partition(Verify([12], "Filter"), STRING).output() + csv_data.write_csv("/tmp/flink/result1", line_delimiter="\n", field_delimiter="|", write_mode=WriteMode.OVERWRITE) - d1 \ - .flat_map(FlatMap(), INT).flat_map(FlatMap(), INT) \ - .map_partition(Verify([1, 2, 2, 4, 6, 12, 12, 24, 12, 24, 24, 48], "FlatMap"), STRING).output() + #Text Source/Sink + text_data = env.read_text("src/test/python/org/apache/flink/python/api/data_text") - d1 \ - .map_partition(MapPartition(), INT) \ - .map_partition(Verify([2, 12, 24], "MapPartition"), STRING).output() + text_data.write_text("/tmp/flink/result2", WriteMode.OVERWRITE) - d1 \ - .reduce(Reduce()) \ - .map_partition(Verify([19], "AllReduce"), STRING).output() - - d4 \ - .group_by(2).reduce(Reduce2()) \ - .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "CombineReduce"), STRING).output() - - d4 \ - .map(Id(), (INT, FLOAT, STRING, BOOL)).group_by(2).reduce(Reduce2()) \ - .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "ChainedReduce"), STRING).output() - - d1 \ - .map(MapperBcv(), INT).with_broadcast_set("test", d2) \ - .map_partition(Verify([1, 6, 12], "Broadcast"), STRING).output() - - d1 \ - .cross(d2).using(Cross(), (INT, BOOL)) \ - .map_partition(Verify([(1, True), (1, False), (6, True), (6, False), (12, True), (12, False)], "Cross"), STRING).output() - - d1 \ - .cross(d3) \ - .map_partition(Verify([(1, ("hello",)), (1, ("world",)), (6, ("hello",)), (6, ("world",)), (12, ("hello",)), (12, ("world",))], "Default Cross"), STRING).output() + #Types + env.from_elements(bytearray(b"hello"), bytearray(b"world"))\ + .map(Id(), BYTES).map_partition(Verify([bytearray(b"hello"), bytearray(b"world")], "Byte"), STRING).output() - d2 \ - .cross(d3).project_second(0).project_first(0, 1) \ - .map_partition(Verify([("hello", 1, 0.5), ("world", 1, 0.5), ("hello", 2, 0.4), ("world", 2, 0.4)], "Project Cross"), STRING).output() + env.from_elements(1, 2, 3, 4, 5)\ + .map(Id(), INT).map_partition(Verify([1,2,3,4,5], "Int"), STRING).output() - d2 \ - .join(d3).where(2).equal_to(0).using(Join(), STRING) \ - .map_partition(Verify(["hello1", "world0.4"], "Join"), STRING).output() + env.from_elements(True, True, False)\ + .map(Id(), BOOL).map_partition(Verify([True, True, False], "Bool"), STRING).output() - d2 \ - .join(d3).where(2).equal_to(0).project_first(0, 3).project_second(0) \ - .map_partition(Verify([(1, True, "hello"), (2, False, "world")], "Project Join"), STRING).output() + env.from_elements(1.4, 1.7, 12312.23)\ + .map(Id(), FLOAT).map_partition(Verify([1.4, 1.7, 12312.23], "Float"), STRING).output() - d2 \ - .join(d3).where(2).equal_to(0) \ - .map_partition(Verify([((1, 0.5, "hello", True), ("hello",)), ((2, 0.4, "world", False), ("world",))], "Default Join"), STRING).output() - - d2 \ - .project(0, 1, 2) \ - .map_partition(Verify([(1, 0.5, "hello"), (2, 0.4, "world")], "Project"), STRING).output() - - d2 \ - .union(d4) \ - .map_partition(Verify2([(1, 0.5, "hello", True), (2, 0.4, "world", False), (1, 0.5, "hello", True), (1, 0.4, "hello", False), (1, 0.5, "hello", True), (2, 0.4, "world", False)], "Union"), STRING).output() - - d4 \ - .group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), combinable=False) \ - .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "AllGroupReduce"), STRING).output() - - d4 \ - .map(Id(), (INT, FLOAT, STRING, BOOL)).group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), combinable=True) \ - .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "ChainedGroupReduce"), STRING).output() - - d4 \ - .group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), combinable=True) \ - .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "CombineGroupReduce"), STRING).output() - - d5 \ - .group_by(2).sort_group(0, Order.DESCENDING).sort_group(1, Order.ASCENDING).reduce_group(GroupReduce3(), (FLOAT, FLOAT, INT), combinable=True) \ - .map_partition(Verify([(4.3, 4.4, 1), (4.1, 4.1, 3)], "ChainedSortedGroupReduce"), STRING).output() - - d4 \ - .co_group(d5).where(0).equal_to(2).using(CoGroup(), ((INT, FLOAT, STRING, BOOL), (FLOAT, FLOAT, INT))) \ - .map_partition(Verify([((1, 0.5, "hello", True), (4.4, 4.3, 1)), ((1, 0.4, "hello", False), (4.3, 4.4, 1))], "CoGroup"), STRING).output() + env.from_elements("hello", "world")\ + .map(Id(), STRING).map_partition(Verify(["hello", "world"], "String"), STRING).output() #Custom Serialization class Ext(MapPartitionFunction): @@ -285,6 +128,105 @@ if __name__ == "__main__": .map(Id(), CUSTOM).map_partition(Ext(), INT) \ .map_partition(Verify([2, 4], "CustomTypeSerialization"), STRING).output() + #Map + class Mapper(MapFunction): + def map(self, value): + return value * value + d1 \ + .map((lambda x: x * x), INT).map(Mapper(), INT) \ + .map_partition(Verify([1, 1296, 20736], "Map"), STRING).output() + + #FlatMap + class FlatMap(FlatMapFunction): + def flat_map(self, value, collector): + collector.collect(value) + collector.collect(value * 2) + d1 \ + .flat_map(FlatMap(), INT).flat_map(FlatMap(), INT) \ + .map_partition(Verify([1, 2, 2, 4, 6, 12, 12, 24, 12, 24, 24, 48], "FlatMap"), STRING).output() + + #MapPartition + class MapPartition(MapPartitionFunction): + def map_partition(self, iterator, collector): + for value in iterator: + collector.collect(value * 2) + d1 \ + .map_partition(MapPartition(), INT) \ + .map_partition(Verify([2, 12, 24], "MapPartition"), STRING).output() + + #Filter + class Filter(FilterFunction): + def __init__(self, limit): + super(Filter, self).__init__() + self.limit = limit + + def filter(self, value): + return value > self.limit + d1 \ + .filter(Filter(5)).filter(Filter(8)) \ + .map_partition(Verify([12], "Filter"), STRING).output() + + #Reduce + class Reduce(ReduceFunction): + def reduce(self, value1, value2): + return value1 + value2 + + class Reduce2(ReduceFunction): + def reduce(self, value1, value2): + return (value1[0] + value2[0], value1[1] + value2[1], value1[2], value1[3] or value2[3]) + d1 \ + .reduce(Reduce()) \ + .map_partition(Verify([19], "AllReduce"), STRING).output() + d4 \ + .group_by(2).reduce(Reduce2()) \ + .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "CombineReduce"), STRING).output() + d4 \ + .map(Id(), (INT, FLOAT, STRING, BOOL)).group_by(2).reduce(Reduce2()) \ + .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "ChainedReduce"), STRING).output() + + #GroupReduce + class GroupReduce(GroupReduceFunction): + def reduce(self, iterator, collector): + if iterator.has_next(): + i, f, s, b = iterator.next() + for value in iterator: + i += value[0] + f += value[1] + b |= value[3] + collector.collect((i, f, s, b)) + + class GroupReduce2(GroupReduceFunction): + def reduce(self, iterator, collector): + for value in iterator: + collector.collect(value) + + class GroupReduce3(GroupReduceFunction): + def reduce(self, iterator, collector): + collector.collect(iterator.next()) + + def combine(self, iterator, collector): + if iterator.has_next(): + v1 = iterator.next() + if iterator.has_next(): + v2 = iterator.next() + if v1[0] < v2[0]: + collector.collect(v1) + else: + collector.collect(v2) + d4 \ + .group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), combinable=False) \ + .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "AllGroupReduce"), STRING).output() + d4 \ + .map(Id(), (INT, FLOAT, STRING, BOOL)).group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), combinable=True) \ + .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "ChainedGroupReduce"), STRING).output() + d4 \ + .group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), combinable=True) \ + .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "CombineGroupReduce"), STRING).output() + d5 \ + .group_by(2).sort_group(0, Order.DESCENDING).sort_group(1, Order.ASCENDING).reduce_group(GroupReduce3(), (FLOAT, FLOAT, INT), combinable=True) \ + .map_partition(Verify([(4.3, 4.4, 1), (4.1, 4.1, 3)], "ChainedSortedGroupReduce"), STRING).output() + + #Execution env.set_degree_of_parallelism(1) env.execute(local=True) http://git-wip-us.apache.org/repos/asf/flink/blob/c1dd9604/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py new file mode 100644 index 0000000..2f30cda --- /dev/null +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py @@ -0,0 +1,145 @@ +# ############################################################################### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from flink.plan.Environment import get_environment +from flink.functions.MapFunction import MapFunction +from flink.functions.MapPartitionFunction import MapPartitionFunction +from flink.functions.CrossFunction import CrossFunction +from flink.functions.JoinFunction import JoinFunction +from flink.functions.CoGroupFunction import CoGroupFunction +from flink.plan.Constants import BOOL, INT, FLOAT, STRING + + +#Utilities +class Id(MapFunction): + def map(self, value): + return value + + +class Verify(MapPartitionFunction): + def __init__(self, expected, name): + super(Verify, self).__init__() + self.expected = expected + self.name = name + + def map_partition(self, iterator, collector): + index = 0 + for value in iterator: + if value != self.expected[index]: + raise Exception(self.name + " Test failed. Expected: " + str(self.expected[index]) + " Actual: " + str(value)) + index += 1 + #collector.collect(self.name + " successful!") + + +class Verify2(MapPartitionFunction): + def __init__(self, expected, name): + super(Verify2, self).__init__() + self.expected = expected + self.name = name + + def map_partition(self, iterator, collector): + for value in iterator: + if value in self.expected: + try: + self.expected.remove(value) + except Exception: + raise Exception(self.name + " failed! Actual value " + str(value) + "not contained in expected values: "+str(self.expected)) + #collector.collect(self.name + " successful!") + + +if __name__ == "__main__": + env = get_environment() + + d1 = env.from_elements(1, 6, 12) + + d2 = env.from_elements((1, 0.5, "hello", True), (2, 0.4, "world", False)) + + d3 = env.from_elements(("hello",), ("world",)) + + d4 = env.from_elements((1, 0.5, "hello", True), (1, 0.4, "hello", False), (1, 0.5, "hello", True), (2, 0.4, "world", False)) + + d5 = env.from_elements((4.4, 4.3, 1), (4.3, 4.4, 1), (4.2, 4.1, 3), (4.1, 4.1, 3)) + + d6 = env.from_elements(1, 1, 12) + + #Join + class Join(JoinFunction): + def join(self, value1, value2): + if value1[3]: + return value2[0] + str(value1[0]) + else: + return value2[0] + str(value1[1]) + d2 \ + .join(d3).where(2).equal_to(0).using(Join(), STRING) \ + .map_partition(Verify(["hello1", "world0.4"], "Join"), STRING).output() + d2 \ + .join(d3).where(2).equal_to(0).project_first(0, 3).project_second(0) \ + .map_partition(Verify([(1, True, "hello"), (2, False, "world")], "Project Join"), STRING).output() + d2 \ + .join(d3).where(2).equal_to(0) \ + .map_partition(Verify([((1, 0.5, "hello", True), ("hello",)), ((2, 0.4, "world", False), ("world",))], "Default Join"), STRING).output() + + #Cross + class Cross(CrossFunction): + def cross(self, value1, value2): + return (value1, value2[3]) + d1 \ + .cross(d2).using(Cross(), (INT, BOOL)) \ + .map_partition(Verify([(1, True), (1, False), (6, True), (6, False), (12, True), (12, False)], "Cross"), STRING).output() + d1 \ + .cross(d3) \ + .map_partition(Verify([(1, ("hello",)), (1, ("world",)), (6, ("hello",)), (6, ("world",)), (12, ("hello",)), (12, ("world",))], "Default Cross"), STRING).output() + d2 \ + .cross(d3).project_second(0).project_first(0, 1) \ + .map_partition(Verify([("hello", 1, 0.5), ("world", 1, 0.5), ("hello", 2, 0.4), ("world", 2, 0.4)], "Project Cross"), STRING).output() + + #CoGroup + class CoGroup(CoGroupFunction): + def co_group(self, iterator1, iterator2, collector): + while iterator1.has_next() and iterator2.has_next(): + collector.collect((iterator1.next(), iterator2.next())) + d4 \ + .co_group(d5).where(0).equal_to(2).using(CoGroup(), ((INT, FLOAT, STRING, BOOL), (FLOAT, FLOAT, INT))) \ + .map_partition(Verify([((1, 0.5, "hello", True), (4.4, 4.3, 1)), ((1, 0.4, "hello", False), (4.3, 4.4, 1))], "CoGroup"), STRING).output() + + #Broadcast + class MapperBcv(MapFunction): + def map(self, value): + factor = self.context.get_broadcast_variable("test")[0][0] + return value * factor + d1 \ + .map(MapperBcv(), INT).with_broadcast_set("test", d2) \ + .map_partition(Verify([1, 6, 12], "Broadcast"), STRING).output() + + #Misc + class Mapper(MapFunction): + def map(self, value): + return value * value + d1 \ + .map(Mapper(), INT).map((lambda x: x * x), INT) \ + .map_partition(Verify([1, 1296, 20736], "Chained Lambda"), STRING).output() + d2 \ + .project(0, 1, 2) \ + .map_partition(Verify([(1, 0.5, "hello"), (2, 0.4, "world")], "Project"), STRING).output() + d2 \ + .union(d4) \ + .map_partition(Verify2([(1, 0.5, "hello", True), (2, 0.4, "world", False), (1, 0.5, "hello", True), (1, 0.4, "hello", False), (1, 0.5, "hello", True), (2, 0.4, "world", False)], "Union"), STRING).output() + + #Execution + env.set_degree_of_parallelism(1) + + env.execute(local=True) http://git-wip-us.apache.org/repos/asf/flink/blob/c1dd9604/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_text.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_text.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_text.py deleted file mode 100644 index 1f90587..0000000 --- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_text.py +++ /dev/null @@ -1,30 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ -from flink.plan.Environment import get_environment -from flink.plan.Constants import WriteMode - -if __name__ == "__main__": - env = get_environment() - - d1 = env.read_text("src/test/python/org/apache/flink/python/api/data_text") - - d1.write_text("/tmp/flink/result", WriteMode.OVERWRITE) - - env.set_degree_of_parallelism(1) - - env.execute(local=True) http://git-wip-us.apache.org/repos/asf/flink/blob/c1dd9604/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py index b2063eb..1ff3f92 100644 --- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py @@ -16,8 +16,20 @@ # limitations under the License. ################################################################################ from flink.plan.Environment import get_environment -from flink.plan.Constants import INT, STRING, BOOL, FLOAT -import sys +from flink.plan.Constants import BOOL, STRING +from flink.functions.MapPartitionFunction import MapPartitionFunction + + +class Verify(MapPartitionFunction): + def __init__(self, msg): + super(Verify, self).__init__() + self.msg = msg + + def map_partition(self, iterator, collector): + if self.msg is None: + return + else: + raise Exception("Type Deduction failed: " + self.msg) if __name__ == "__main__": env = get_environment() @@ -28,35 +40,34 @@ if __name__ == "__main__": direct_from_source = d1.filter(lambda x:True) + msg = None + if direct_from_source._info.types != ("hello", 4, 3.2, True): - sys.exit("Error deducting type directly from source.") + msg = "Error deducting type directly from source." from_common_udf = d1.map(lambda x: x[3], BOOL).filter(lambda x:True) if from_common_udf._info.types != BOOL: - sys.exit("Error deducting type from common udf.") + msg = "Error deducting type from common udf." through_projection = d1.project(3, 2).filter(lambda x:True) if through_projection._info.types != (True, 3.2): - sys.exit("Error deducting type through projection.") + msg = "Error deducting type through projection." through_default_op = d1.cross(d2).filter(lambda x:True) if through_default_op._info.types != (("hello", 4, 3.2, True), "world"): - sys.exit("Error deducting type through default J/C." +str(through_default_op._info.types)) + msg = "Error deducting type through default J/C." +str(through_default_op._info.types) through_prj_op = d1.cross(d2).project_first(1, 0).project_second().project_first(3, 2).filter(lambda x:True) if through_prj_op._info.types != (4, "hello", "world", True, 3.2): - sys.exit("Error deducting type through projection J/C. "+str(through_prj_op._info.types)) + msg = "Error deducting type through projection J/C. "+str(through_prj_op._info.types) env = get_environment() - msg = env.from_elements("Type deduction test successful.") - - msg.output() - - env.execute() + env.from_elements("dummy").map_partition(Verify(msg), STRING).output() + env.execute(local=True) http://git-wip-us.apache.org/repos/asf/flink/blob/c1dd9604/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_types.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_types.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_types.py deleted file mode 100644 index f5f3ee4..0000000 --- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_types.py +++ /dev/null @@ -1,70 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ -from flink.plan.Environment import get_environment -from flink.functions.MapFunction import MapFunction -from flink.functions.MapPartitionFunction import MapPartitionFunction -from flink.plan.Constants import BOOL, INT, FLOAT, STRING, BYTES - - -class Verify(MapPartitionFunction): - def __init__(self, expected, name): - super(Verify, self).__init__() - self.expected = expected - self.name = name - - def map_partition(self, iterator, collector): - index = 0 - for value in iterator: - if value != self.expected[index]: - print(self.name + " Test failed. Expected: " + str(self.expected[index]) + " Actual: " + str(value)) - raise Exception(self.name + " failed!") - index += 1 - collector.collect(self.name + " successful!") - - -class Id(MapFunction): - def map(self, value): - return value - - -if __name__ == "__main__": - env = get_environment() - - d1 = env.from_elements(bytearray(b"hello"), bytearray(b"world")) - - d1.map(Id(), BYTES).map_partition(Verify([bytearray(b"hello"), bytearray(b"world")], "Byte"), STRING).output() - - d2 = env.from_elements(1,2,3,4,5) - - d2.map(Id(), INT).map_partition(Verify([1,2,3,4,5], "Int"), STRING).output() - - d3 = env.from_elements(True, True, False) - - d3.map(Id(), BOOL).map_partition(Verify([True, True, False], "Bool"), STRING).output() - - d4 = env.from_elements(1.4, 1.7, 12312.23) - - d4.map(Id(), FLOAT).map_partition(Verify([1.4, 1.7, 12312.23], "Float"), STRING).output() - - d5 = env.from_elements("hello", "world") - - d5.map(Id(), STRING).map_partition(Verify(["hello", "world"], "String"), STRING).output() - - env.set_degree_of_parallelism(1) - - env.execute(local=True) \ No newline at end of file