http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/run_all_tests.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/run_all_tests.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/run_all_tests.py new file mode 100644 index 0000000..1970918 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/run_all_tests.py @@ -0,0 +1,75 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import sys +from os.path import dirname, join, basename +from glob import glob +from org.apache.flink.runtime.client import JobExecutionException + + +excluded_tests = [ + 'test_kafka09', +] + + +class Main: + def __init__(self): + pass + + def run(self): + tests = [] + pwd = dirname(sys.argv[0]) + print("Working directory: {}".format(pwd)) + + if excluded_tests: + print("Excluded tests: {}\n".format(excluded_tests)) + + for x in glob(join(pwd, 'test_*.py')): + if not x.startswith('__'): + test_module_name = basename(x)[:-3] + if test_module_name not in excluded_tests: + tests.append(__import__(test_module_name, globals(), locals())) + + failed_tests = [] + for test in tests: + print("Submitting job ... '{}'".format(test.__name__)) + try: + test.main() + print("Job completed ('{}')\n".format(test.__name__)) + except JobExecutionException as ex: + failed_tests.append(test.__name__) + print("\n{}\n{}\n{}\n".format('#'*len(ex.message), ex.message, '#'*len(ex.message))) + except: + failed_tests.append(test.__name__) + ex_type = sys.exc_info()[0] + print("\n{}\n{}\n{}\n".format('#'*len(ex_type), ex_type, '#'*len(ex_type))) + + if failed_tests: + print("\nThe following tests were failed:") + for failed_test in failed_tests: + print("\t* " + failed_test) + raise Exception("\nFailed test(s): {}".format(failed_tests)) + else: + print("\n*** All tests passed successfully ***") + + +def main(): + Main().run() + + +if __name__ == "__main__": + main()
http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_filter.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_filter.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_filter.py new file mode 100644 index 0000000..c32124e --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_filter.py @@ -0,0 +1,82 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import sys +from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction, FilterFunction +from org.apache.flink.api.java.functions import KeySelector +from org.apache.flink.streaming.api.windowing.time.Time import milliseconds + +from utils import constants +from utils.pygeneratorbase import PyGeneratorBase +from utils.python_test_base import TestBase + + +class Generator(PyGeneratorBase): + def __init__(self, num_iters): + super(Generator, self).__init__(num_iters) + self._alternator = True + + def do(self, ctx): + ctx.collect('Hello' if self._alternator else 'World') + self._alternator = not self._alternator + + +class Filterer(FilterFunction): + def filter(self, value): + return value == 'Hello' + + +class Tokenizer(FlatMapFunction): + def flatMap(self, value, collector): + for word in value.lower().split(): + collector.collect((1, word)) + + +class Sum(ReduceFunction): + def reduce(self, input1, input2): + count1, word1 = input1 + count2, word2 = input2 + return (count1 + count2, word1) + + +class Selector(KeySelector): + def getKey(self, input): + return input[1] + + +class Main(TestBase): + def __init__(self): + super(Main, self).__init__() + + def run(self): + env = self._get_execution_environment() + env.create_python_source(Generator(num_iters=constants.NUM_ITERATIONS_IN_TEST)) \ + .filter(Filterer()) \ + .flat_map(Tokenizer()) \ + .key_by(Selector()) \ + .time_window(milliseconds(100)) \ + .reduce(Sum()) \ + .print() + + env.execute(True) + +def main(): + Main().run() + +if __name__ == '__main__': + main() + print("Job completed ({})\n".format(sys.argv)) http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_int.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_int.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_int.py new file mode 100644 index 0000000..68ac3ef --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_int.py @@ -0,0 +1,77 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import sys +from utils import constants +from utils.python_test_base import TestBase +from utils.pygeneratorbase import PyGeneratorBase +from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction +from org.apache.flink.api.java.functions import KeySelector +from org.apache.flink.streaming.api.windowing.time.Time import seconds +from org.apache.flink.streaming.api import CheckpointingMode + + +class Generator(PyGeneratorBase): + def __init__(self, num_iters): + super(Generator, self).__init__(num_iters) + + def do(self, ctx): + ctx.collect(222) + + +class Tokenizer(FlatMapFunction): + def flatMap(self, value, collector): + collector.collect((1, value)) + + +class Sum(ReduceFunction): + def reduce(self, input1, input2): + count1, val1 = input1 + count2, val2 = input2 + return (count1 + count2, val1) + + +class Selector(KeySelector): + def getKey(self, input): + return input[1] + + +class Main(TestBase): + def __init__(self): + super(Main, self).__init__() + + def run(self): + env = self._get_execution_environment() + env.enable_checkpointing(1000, CheckpointingMode.AT_LEAST_ONCE) \ + .create_python_source(Generator(num_iters=constants.NUM_ITERATIONS_IN_TEST)) \ + .flat_map(Tokenizer()) \ + .key_by(Selector()) \ + .time_window(seconds(1)) \ + .reduce(Sum()) \ + .print() + + result = env.execute("MyJob", True) + print("Job completed, job_id={}".format(str(result.jobID))) + + +def main(): + Main().run() + + +if __name__ == '__main__': + main() + print("Job completed ({})\n".format(sys.argv)) http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_list_int.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_list_int.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_list_int.py new file mode 100644 index 0000000..6f6db53 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_list_int.py @@ -0,0 +1,76 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import sys +from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction +from org.apache.flink.api.java.functions import KeySelector +from org.apache.flink.streaming.api.windowing.time.Time import seconds + +from utils import constants +from utils.pygeneratorbase import PyGeneratorBase +from utils.python_test_base import TestBase + + +class Generator(PyGeneratorBase): + def __init__(self, num_iters): + super(Generator, self).__init__(num_iters) + + def do(self, ctx): + ctx.collect([222, 333]) + + +class Tokenizer(FlatMapFunction): + def flatMap(self, value, collector): + for v in value: + collector.collect((1, v)) + + +class Sum(ReduceFunction): + def reduce(self, input1, input2): + count1, val1 = input1 + count2, val2 = input2 + return (count1 + count2, val1) + + +class Selector(KeySelector): + def getKey(self, input): + return input[1] + + +class Main(TestBase): + def __init__(self): + super(Main, self).__init__() + + def run(self): + env = self._get_execution_environment() + env.create_python_source(Generator(num_iters=constants.NUM_ITERATIONS_IN_TEST)) \ + .flat_map(Tokenizer()) \ + .key_by(Selector()) \ + .time_window(seconds(1)) \ + .reduce(Sum()) \ + .print() + + env.execute(True) + + +def main(): + Main().run() + + +if __name__ == '__main__': + main() + print("Job completed ({})\n".format(sys.argv)) http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_string.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_string.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_string.py new file mode 100644 index 0000000..9f5e7e5 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_string.py @@ -0,0 +1,76 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import sys +from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction +from org.apache.flink.api.java.functions import KeySelector +from org.apache.flink.streaming.api.windowing.time.Time import milliseconds + +from utils import constants +from utils.pygeneratorbase import PyGeneratorBase +from utils.python_test_base import TestBase + + +class Generator(PyGeneratorBase): + def __init__(self, num_iters): + super(Generator, self).__init__(num_iters) + + def do(self, ctx): + ctx.collect('Hello World') + + +class Tokenizer(FlatMapFunction): + def flatMap(self, value, collector): + for word in value.lower().split(): + collector.collect((1, word)) + + +class Selector(KeySelector): + def getKey(self, input): + return input[1] + + +class Sum(ReduceFunction): + def reduce(self, input1, input2): + count1, word1 = input1 + count2, word2 = input2 + return (count1 + count2, word1) + + +class Main(TestBase): + def __init__(self): + super(Main, self).__init__() + + def run(self): + env = self._get_execution_environment() + env.create_python_source(Generator(num_iters=constants.NUM_ITERATIONS_IN_TEST)) \ + .flat_map(Tokenizer()) \ + .key_by(Selector()) \ + .time_window(milliseconds(50)) \ + .reduce(Sum()) \ + .print() + + env.execute(True) + + +def main(): + Main().run() + + +if __name__ == '__main__': + main() + print("Job completed ({})\n".format(sys.argv)) http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_collection.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_collection.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_collection.py new file mode 100644 index 0000000..67b1a42 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_collection.py @@ -0,0 +1,67 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction +from org.apache.flink.api.java.functions import KeySelector +from org.apache.flink.streaming.api.windowing.time.Time import milliseconds + +from utils import constants +from utils.python_test_base import TestBase + + +class Tokenizer(FlatMapFunction): + def flatMap(self, value, collector): + collector.collect((1, value)) + + +class Sum(ReduceFunction): + def reduce(self, input1, input2): + count1, val1 = input1 + count2, val2 = input2 + return (count1 + count2, val1) + + +class Selector(KeySelector): + def getKey(self, input): + return input[1] + + +class Main(TestBase): + def __init__(self): + super(Main, self).__init__() + + def run(self): + elements = ["aa" if iii % 2 == 0 else "bbb" for iii in range(constants.NUM_ELEMENTS_IN_TEST)] + + env = self._get_execution_environment() + env.from_collection(elements) \ + .flat_map(Tokenizer()) \ + .key_by(Selector()) \ + .time_window(milliseconds(10)) \ + .reduce(Sum()) \ + .print() + + result = env.execute("MyJob", True) + print("Job completed, job_id={}".format(result.jobID)) + + +def main(): + Main().run() + + +if __name__ == '__main__': + main() http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_elements.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_elements.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_elements.py new file mode 100644 index 0000000..59f8cf7 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_elements.py @@ -0,0 +1,67 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction +from org.apache.flink.api.java.functions import KeySelector +from org.apache.flink.streaming.api.windowing.time.Time import milliseconds + +from utils import constants +from utils.python_test_base import TestBase + + +class Tokenizer(FlatMapFunction): + def flatMap(self, value, collector): + collector.collect((1, value)) + + +class Sum(ReduceFunction): + def reduce(self, input1, input2): + count1, val1 = input1 + count2, val2 = input2 + return (count1 + count2, val1) + + +class Selector(KeySelector): + def getKey(self, input): + return input[1][0] + + +class Main(TestBase): + def __init__(self): + super(Main, self).__init__() + + def run(self): + elements = [("Alice", 111) if iii % 2 == 0 else ("Bob", 2222) for iii in range(constants.NUM_ELEMENTS_IN_TEST)] + + env = self._get_execution_environment() + env.from_elements(*elements) \ + .flat_map(Tokenizer()) \ + .key_by(Selector()) \ + .time_window(milliseconds(10)) \ + .reduce(Sum()) \ + .print() + + result = env.execute("MyJob", True) + print("Job completed, job_id={}".format(result.jobID)) + + +def main(): + Main().run() + + +if __name__ == '__main__': + main() http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_iterator.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_iterator.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_iterator.py new file mode 100644 index 0000000..9e615d7 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_iterator.py @@ -0,0 +1,84 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from java.util import NoSuchElementException +from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction +from org.apache.flink.api.java.functions import KeySelector +from org.apache.flink.streaming.python.util import PythonIterator +from org.apache.flink.streaming.api.windowing.time.Time import milliseconds + +from utils import constants +from utils.python_test_base import TestBase + + +class SomeIterator(PythonIterator): + def __init__(self, n): + self._iii = 0 + self._nnn = n + + def hasNext(self): + return self._iii < self._nnn + + def next(self): + if self._iii < self._nnn: + i = self._iii + self._iii += 1 + return 111 if i % 2 == 0 else 222 + else: + raise NoSuchElementException() + + +class Tokenizer(FlatMapFunction): + def flatMap(self, value, collector): + collector.collect((1, value)) + + +class Sum(ReduceFunction): + def reduce(self, input1, input2): + count1, val1 = input1 + count2, val2 = input2 + return (count1 + count2, val1) + + +class Selector(KeySelector): + def getKey(self, input): + return input[1] + + +class Main(TestBase): + def __init__(self): + super(Main, self).__init__() + + def run(self): + env = self._get_execution_environment() + env.from_collection(SomeIterator(constants.NUM_ITERATIONS_IN_TEST)) \ + .flat_map(Tokenizer()) \ + .key_by(Selector()) \ + .time_window(milliseconds(10)) \ + .reduce(Sum()) \ + .print() + + result = env.execute("MyJob", True) + print("Job completed, job_id={}".format(result.jobID)) + + +def main(): + Main().run() + + +if __name__ == '__main__': + main() http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_generate_sequence.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_generate_sequence.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_generate_sequence.py new file mode 100644 index 0000000..2e24464 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_generate_sequence.py @@ -0,0 +1,65 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction +from org.apache.flink.api.java.functions import KeySelector +from org.apache.flink.streaming.api.windowing.time.Time import milliseconds + +from utils import constants +from utils.python_test_base import TestBase + + +class Tokenizer(FlatMapFunction): + def flatMap(self, value, collector): + collector.collect((1, value)) + + +class Sum(ReduceFunction): + def reduce(self, input1, input2): + count1, val1 = input1 + count2, val2 = input2 + return (count1 + count2, val1) + + +class Selector(KeySelector): + def getKey(self, input): + return 1 + + +class Main(TestBase): + def __init__(self): + super(Main, self).__init__() + + def run(self): + env = self._get_execution_environment() + env.generate_sequence(1, constants.NUM_ITERATIONS_IN_TEST) \ + .flat_map(Tokenizer()) \ + .key_by(Selector()) \ + .time_window(milliseconds(10)) \ + .reduce(Sum()) \ + .print() + + result = env.execute("MyJob", True) + print("Job completed, job_id={}".format(result.jobID)) + + +def main(): + Main().run() + + +if __name__ == '__main__': + main() http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_iterations.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_iterations.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_iterations.py new file mode 100644 index 0000000..eea8d5c --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_iterations.py @@ -0,0 +1,69 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from org.apache.flink.api.common.functions import FilterFunction +from org.apache.flink.api.common.functions import MapFunction + +from utils import constants +from utils.python_test_base import TestBase + + +class MinusOne(MapFunction): + def map(self, value): + return value - 1 + + +class PositiveNumber(FilterFunction): + def filter(self, value): + return value > 0 + + +class LessEquelToZero(FilterFunction): + def filter(self, value): + return value <= 0 + + +class Main(TestBase): + def __init__(self): + super(Main, self).__init__() + + def run(self): + env = self._get_execution_environment() + some_integers = env.from_collection([2] * 5) + + iterative_stream = some_integers.iterate(constants.MAX_EXECUTION_TIME_MS) + + minus_one_stream = iterative_stream.map(MinusOne()) + + still_greater_then_zero = minus_one_stream.filter(PositiveNumber()) + + iterative_stream.close_with(still_greater_then_zero) + + less_then_zero = minus_one_stream.filter(LessEquelToZero()) + + less_then_zero.print() + + result = env.execute("MyJob", True) + print("Job completed, job_id={}".format(result.jobID)) + + +def main(): + Main().run() + + +if __name__ == '__main__': + main() http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_kafka09.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_kafka09.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_kafka09.py new file mode 100644 index 0000000..1be7c17 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_kafka09.py @@ -0,0 +1,157 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import sys +import threading +from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction +from org.apache.flink.api.java.functions import KeySelector +from org.apache.flink.api.java.utils import ParameterTool +from org.apache.flink.streaming.python.connectors import PythonFlinkKafkaProducer09, PythonFlinkKafkaConsumer09 +from org.apache.flink.streaming.api.functions.source import SourceFunction +from org.apache.flink.streaming.api.windowing.time.Time import milliseconds +from org.apache.flink.streaming.util.serialization import DeserializationSchema +from org.apache.flink.streaming.util.serialization import SerializationSchema + +from utils import constants +from utils import utils +from utils.python_test_base import TestBase + +KAFKA_DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092" + + +class StringGenerator(SourceFunction): + def __init__(self, msg, end_msg, num_iters=7000): + self._running = True + self._msg = msg + self._end_msg = end_msg + self._num_iters = num_iters + def run(self, ctx): + counter = 0 + while self._running and counter < self._num_iters - 1: + counter += 1 + ctx.collect(self._msg) + ctx.collect(self._end_msg) + def cancel(self): + self._running = False + + +class ToStringSchema(SerializationSchema): + def serialize(self, value): + return str(value) + + +class KafkaStringProducer(threading.Thread, TestBase): + def __init__(self, bootstrap_server, msg, end_msg, num_iters): + threading.Thread.__init__(self) + self._bootstrap_server = bootstrap_server + self._msg = msg + self._end_msg = end_msg + # if self._msg[-1] != '\n': self._msg += '\n' + # if self._end_msg[-1] != '\n': self._end_msg += '\n' + self._num_iters = num_iters + + def run(self): + env = self._get_execution_environment() + + stream = env.create_python_source(StringGenerator(self._msg, self._end_msg, num_iters=100)) + + producer = PythonFlinkKafkaProducer09(KAFKA_DEFAULT_BOOTSTRAP_SERVERS, "kafka09-test", ToStringSchema()) + producer.set_log_failures_only(False); # "False" by default + producer.set_flush_on_checkpoint(True); # "True" by default + + stream.add_sink(producer) + + result = env.execute("Kafka09 producer test") + print("Kafka09 producer job completed, job_id={}".format(result.jobID)) + + +class StringDeserializationSchema(DeserializationSchema): + def deserialize(self, message): + return ''.join(map(chr,message)) + + def isEndOfStream(self, element): + return str(element) == "quit" + + +class Tokenizer(FlatMapFunction): + def flatMap(self, value, collector): + for word in value.lower().split(): + collector.collect((1, word)) + + +class Sum(ReduceFunction): + def reduce(self, input1, input2): + count1, val1 = input1 + count2, val2 = input2 + return (count1 + count2, val1) + + +class Selector(KeySelector): + def getKey(self, input): + return input[1] + + +class KafkaStringConsumer(threading.Thread, TestBase): + def __init__(self, bootstrap_server): + threading.Thread.__init__(self) + self._bootstrap_server = bootstrap_server + + def run(self): + parameterTool = ParameterTool.fromArgs(sys.argv[1:]) + props = parameterTool.getProperties() + props.setProperty("bootstrap.servers", self._bootstrap_server) + + consumer = PythonFlinkKafkaConsumer09("kafka09-test", StringDeserializationSchema(), props) + + env = self._get_execution_environment() + env.add_java_source(consumer) \ + .flat_map(Tokenizer()) \ + .key_by(Selector()) \ + .time_window(milliseconds(100)) \ + .reduce(Sum()) \ + .print() + + result = env.execute("Python consumer kafka09 test", True) + print("Kafka09 consumer job completed, job_id={}".format(result.jobID)) + + +class Main(TestBase): + def __init__(self): + super(Main, self).__init__() + + def run(self): + host, port = KAFKA_DEFAULT_BOOTSTRAP_SERVERS.split(":") + if not utils.is_reachable(host, int(port)): + print("Kafka server is not reachable: [{}]".format(KAFKA_DEFAULT_BOOTSTRAP_SERVERS)) + return + + kafka_p = KafkaStringProducer(KAFKA_DEFAULT_BOOTSTRAP_SERVERS, "Hello World", "quit", constants.NUM_ITERATIONS_IN_TEST) + kafka_c = KafkaStringConsumer(KAFKA_DEFAULT_BOOTSTRAP_SERVERS) + + kafka_p.start() + kafka_c.start() + + kafka_p.join() + kafka_c.join() + + +def main(): + Main().run() + + +if __name__ == '__main__': + main() http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_keyed_stream_reduce.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_keyed_stream_reduce.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_keyed_stream_reduce.py new file mode 100644 index 0000000..07a60c0 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_keyed_stream_reduce.py @@ -0,0 +1,69 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import sys +from org.apache.flink.api.common.functions import ReduceFunction, FlatMapFunction +from org.apache.flink.api.java.functions import KeySelector + +from utils import constants +from utils.python_test_base import TestBase + + +class Tokenizer(FlatMapFunction): + def flatMap(self, value, collector): + for v in value: + collector.collect((1, v)) + + +class Selector(KeySelector): + def getKey(self, input): + return input[1] + + +class Sum(ReduceFunction): + def reduce(self, input1, input2): + count1, val1 = input1 + count2, val2 = input2 + return (count1 + count2, val1) + + +class Main(TestBase): + def __init__(self): + super(Main, self).__init__() + + def run(self): + elements = [(1, 222 if x % 2 == 0 else 333) for x in range(constants.NUM_ELEMENTS_IN_TEST)] + + env = self._get_execution_environment() + env.set_parallelism(2) \ + .from_elements(elements) \ + .flat_map(Tokenizer()) \ + .key_by(Selector()) \ + .reduce(Sum()) \ + .print() + + result = env.execute("MyJob", True) + print("Job completed, job_id={}".format(str(result.jobID))) + + +def main(): + Main().run() + + +if __name__ == '__main__': + main() + print("Job completed ({})\n".format(sys.argv)) http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map.py new file mode 100644 index 0000000..43984d0 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map.py @@ -0,0 +1,83 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import sys +from utils.python_test_base import TestBase +from utils.pygeneratorbase import PyGeneratorBase +from org.apache.flink.api.common.functions import MapFunction, FlatMapFunction, ReduceFunction +from org.apache.flink.api.java.functions import KeySelector +from org.apache.flink.streaming.api.windowing.time.Time import milliseconds + + +class Generator(PyGeneratorBase): + def __init__(self, num_iters): + super(Generator, self).__init__(num_iters) + + def do(self, ctx): + ctx.collect(222) + +class DummyTupple(MapFunction): + def map(self, value): + return (value, value) + +class MinusOne(MapFunction): + def map(self, value): + return value[0] - 1 + +class Tokenizer(FlatMapFunction): + def flatMap(self, value, collector): + collector.collect((1, value)) + + +class Sum(ReduceFunction): + def reduce(self, input1, input2): + count1, val1 = input1 + count2, val2 = input2 + return (count1 + count2, val1) + + +class Selector(KeySelector): + def getKey(self, input): + return input[1] + + +class Main(TestBase): + def __init__(self): + super(Main, self).__init__() + + def run(self): + env = self._get_execution_environment() + env.from_collection([3] * 5) \ + .map(DummyTupple()) \ + .map(MinusOne()) \ + .flat_map(Tokenizer()) \ + .key_by(Selector()) \ + .time_window(milliseconds(5)) \ + .reduce(Sum()) \ + .print() + + result = env.execute("MyJob", True) + print("Job completed, job_id={}".format(str(result.jobID))) + + +def main(): + Main().run() + + +if __name__ == '__main__': + main() + print("Job completed ({})\n".format(sys.argv)) http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map_int.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map_int.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map_int.py new file mode 100644 index 0000000..44e7bfc --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map_int.py @@ -0,0 +1,77 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import sys +from org.apache.flink.api.common.functions import MapFunction, ReduceFunction +from org.apache.flink.api.java.functions import KeySelector +from org.apache.flink.streaming.api.windowing.time.Time import seconds + +from utils import constants +from utils.pygeneratorbase import PyGeneratorBase +from utils.python_test_base import TestBase + + +class Generator(PyGeneratorBase): + def __init__(self, num_iters): + super(Generator, self).__init__(num_iters) + self._alternator = True + + def do(self, ctx): + ctx.collect(10 if self._alternator else -10) + self._alternator = not self._alternator + + +class Tokenizer(MapFunction): + def map(self, value): + return (1, 2 * value) + + +class Selector(KeySelector): + def getKey(self, input): + return input[1] + + +class Sum(ReduceFunction): + def reduce(self, input1, input2): + count1, val1 = input1 + count2, val2 = input2 + return (count1 + count2, val1) + + +class Main(TestBase): + def __init__(self): + super(Main, self).__init__() + + def run(self): + env = self._get_execution_environment() + env.create_python_source(Generator(num_iters=constants.NUM_ITERATIONS_IN_TEST)) \ + .map(Tokenizer()) \ + .key_by(Selector()) \ + .time_window(seconds(1)) \ + .reduce(Sum()) \ + .print() + + env.execute(True) + + +def main(): + Main().run() + + +if __name__ == '__main__': + main() + print("Job completed ({})\n".format(sys.argv)) http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_read_text_file.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_read_text_file.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_read_text_file.py new file mode 100644 index 0000000..36c86c9 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_read_text_file.py @@ -0,0 +1,82 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import os +import re +import sys +import uuid +from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction +from org.apache.flink.api.java.functions import KeySelector +from org.apache.flink.streaming.api.windowing.time.Time import milliseconds + +from utils import constants +from utils.python_test_base import TestBase + + +class Tokenizer(FlatMapFunction): + def flatMap(self, value, collector): + for word in re.sub(r'\s', '', value).split(','): + collector.collect((1, word)) + + +class Sum(ReduceFunction): + def reduce(self, input1, input2): + count1, word1 = input1 + count2, word2 = input2 + return (count1 + count2, word1) + + +class Selector(KeySelector): + def getKey(self, input): + return input[1] + + +def generate_tmp_text_file(num_lines=100): + tmp_f = open("/tmp/{}".format(uuid.uuid4().get_hex()), 'w') + for iii in range(num_lines): + tmp_f.write('111, 222, 333, 444, 555, 666, 777\n') + return tmp_f + + +class Main(TestBase): + def __init__(self): + super(Main, self).__init__() + + def run(self): + tmp_f = generate_tmp_text_file(constants.NUM_ELEMENTS_IN_TEST) + try: + env = self._get_execution_environment() + env.read_text_file(tmp_f.name) \ + .flat_map(Tokenizer()) \ + .key_by(Selector()) \ + .time_window(milliseconds(100)) \ + .reduce(Sum()) \ + .print() + + env.execute(True) + finally: + tmp_f.close() + os.unlink(tmp_f.name) + + +def main(): + Main().run() + + +if __name__ == '__main__': + main() + print("Job completed ({})\n".format(sys.argv)) http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_socket_text_stream.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_socket_text_stream.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_socket_text_stream.py new file mode 100644 index 0000000..726d69f --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_socket_text_stream.py @@ -0,0 +1,95 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import socket +import sys +import threading +import time +from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction +from org.apache.flink.api.java.functions import KeySelector +from org.apache.flink.streaming.api.windowing.time.Time import seconds + +from utils import constants +from utils import utils +from utils.python_test_base import TestBase + + +class SocketStringGenerator(threading.Thread): + def __init__(self, host, port, msg, num_iters): + threading.Thread.__init__(self) + self._host = host + self._port = port + self._msg = msg + if self._msg[-1] != '\n': + self._msg += '\n' + self._num_iters = num_iters + + def run(self): + serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + serversocket.bind((self._host, self._port)) + serversocket.listen(5) + (clientsocket, address) = serversocket.accept() + for iii in range(self._num_iters): + clientsocket.send(self._msg) + clientsocket.close() + serversocket.close() + + +class Tokenizer(FlatMapFunction): + def flatMap(self, value, collector): + for word in value.lower().split(): + collector.collect((1, word)) + + +class Sum(ReduceFunction): + def reduce(self, input1, input2): + count1, word1 = input1 + count2, word2 = input2 + return (count1 + count2, word1) + + +class Selector(KeySelector): + def getKey(self, input): + return input[1] + +class Main(TestBase): + def __init__(self): + super(Main, self).__init__() + + def run(self): + f_port = utils.gen_free_port() + SocketStringGenerator(host='', port=f_port, msg='Hello World', num_iters=constants.NUM_ITERATIONS_IN_TEST).start() + time.sleep(0.5) + + env = self._get_execution_environment() + env.socket_text_stream('localhost', f_port) \ + .flat_map(Tokenizer()) \ + .key_by(Selector()) \ + .time_window(seconds(1)) \ + .reduce(Sum()) \ + .print() + + env.execute(True) + + +def main(): + Main().run() + + +if __name__ == '__main__': + main() + print("Job completed ({})\n".format(sys.argv)) http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_split_select.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_split_select.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_split_select.py new file mode 100644 index 0000000..9052652 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_split_select.py @@ -0,0 +1,80 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction +from org.apache.flink.api.java.functions import KeySelector +from org.apache.flink.streaming.api.collector.selector import OutputSelector +from org.apache.flink.streaming.api.windowing.time.Time import milliseconds + +from utils import constants +from utils.python_test_base import TestBase + + +class StreamSelector(OutputSelector): + def select(self, value): + return ['lower_stream'] if value < constants.NUM_ITERATIONS_IN_TEST / 2 else ['upper_stream'] + + +class Tokenizer(FlatMapFunction): + def flatMap(self, value, collector): + collector.collect((1, value)) + + +class Sum(ReduceFunction): + def reduce(self, input1, input2): + count1, val1 = input1 + count2, val2 = input2 + return (count1 + count2, val1) + + +class Selector(KeySelector): + def getKey(self, input): + return 1 + +class Main(TestBase): + def __init__(self): + super(Main, self).__init__() + + def run(self): + env = self._get_execution_environment() + + split_window = env.generate_sequence(1, constants.NUM_ITERATIONS_IN_TEST).split(StreamSelector()) + + split_window.select('lower_stream') \ + .flat_map(Tokenizer()) \ + .key_by(Selector()) \ + .time_window(milliseconds(10)) \ + .reduce(Sum()) \ + .print() + + split_window.select('upper_stream') \ + .flat_map(Tokenizer()) \ + .key_by(Selector()) \ + .time_window(milliseconds(10)) \ + .reduce(Sum()) \ + .print() + + result = env.execute("MyJob", True) + print("Job completed, job_id={}".format(result.jobID)) + + +def main(): + Main().run() + + +if __name__ == '__main__': + main() http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_union.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_union.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_union.py new file mode 100644 index 0000000..d70f87c --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_union.py @@ -0,0 +1,77 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction +from org.apache.flink.api.java.functions import KeySelector +from org.apache.flink.streaming.api.windowing.time.Time import milliseconds + +from utils import constants +from utils.pygeneratorbase import PyGeneratorBase +from utils.python_test_base import TestBase + + +class Generator(PyGeneratorBase): + def __init__(self, msg, num_iters): + super(Generator, self).__init__(num_iters) + self._msg = msg + def do(self, ctx): + ctx.collect(self._msg) + + +class Tokenizer(FlatMapFunction): + def flatMap(self, value, collector): + collector.collect((1, value)) + + +class Sum(ReduceFunction): + def reduce(self, input1, input2): + count1, val1 = input1 + count2, val2 = input2 + return (count1 + count2, val1) + + +class Selector(KeySelector): + def getKey(self, input): + return 1 + +class Main(TestBase): + def __init__(self): + super(Main, self).__init__() + + def run(self): + env = self._get_execution_environment() + seq1 = env.create_python_source(Generator(msg='Hello', num_iters=constants.NUM_ITERATIONS_IN_TEST)) + seq2 = env.create_python_source(Generator(msg='World', num_iters=constants.NUM_ITERATIONS_IN_TEST)) + seq3 = env.create_python_source(Generator(msg='Happy', num_iters=constants.NUM_ITERATIONS_IN_TEST)) + + seq1.union(seq2, seq3) \ + .flat_map(Tokenizer()) \ + .key_by(Selector()) \ + .time_window(milliseconds(10)) \ + .reduce(Sum()) \ + .print() + + result = env.execute("My python union stream test", True) + print("Job completed, job_id={}".format(result.jobID)) + + +def main(): + Main().run() + + +if __name__ == '__main__': + main() http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_user_type.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_user_type.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_user_type.py new file mode 100644 index 0000000..3ace000 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_user_type.py @@ -0,0 +1,86 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import sys +from org.apache.flink.api.common.functions import MapFunction, ReduceFunction +from org.apache.flink.api.java.functions import KeySelector +from org.apache.flink.streaming.api.windowing.time.Time import milliseconds + +from utils import constants +from utils.pygeneratorbase import PyGeneratorBase +from utils.python_test_base import TestBase + + +class Person: + def __init__(self, name, age): + self.name = name + self.age = age + def __repr__(self): + return "{} (age: {})".format(self.name, self.age) + + +class Generator(PyGeneratorBase): + def __init__(self, num_iters): + super(Generator, self).__init__(num_iters) + self._alternator = True + + def do(self, ctx): + person = Person("Avi", 47) if self._alternator else Person("Bob", 33) + ctx.collect(person) + self._alternator = not self._alternator + + +class Tokenizer(MapFunction): + def map(self, value): + return (1, value) + + +class Selector(KeySelector): + def getKey(self, input): + return input[1].name + + +class Sum(ReduceFunction): + def reduce(self, input1, input2): + count1, val1 = input1 + count2, val2 = input2 + return (count1 + count2, val1) + + +class Main(TestBase): + def __init__(self): + super(Main, self).__init__() + + def run(self): + env = self._get_execution_environment() + env.create_python_source(Generator(num_iters=constants.NUM_ITERATIONS_IN_TEST)) \ + .map(Tokenizer()) \ + .key_by(Selector()) \ + .time_window(milliseconds(30)) \ + .reduce(Sum()) \ + .print() + + env.execute(True) + + +def main(): + Main().run() + + +if __name__ == '__main__': + main() + print("Job completed ({})\n".format(sys.argv)) http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_window_apply.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_window_apply.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_window_apply.py new file mode 100644 index 0000000..8a651df --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_window_apply.py @@ -0,0 +1,72 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import sys +from org.apache.flink.api.java.functions import KeySelector +from org.apache.flink.streaming.api.functions.windowing import WindowFunction +from org.apache.flink.streaming.api.windowing.time.Time import seconds + +from utils import constants +from utils.pygeneratorbase import PyGeneratorBase +from utils.python_test_base import TestBase + + +class Generator(PyGeneratorBase): + def __init__(self, num_iters): + super(Generator, self).__init__(num_iters) + self._alternator = True + + def do(self, ctx): + if self._alternator: + ctx.collect(('key1', 'Any specific text here')) + else: + ctx.collect(('key2', 'Any other specific text here')) + self._alternator = not self._alternator + + +class Selector(KeySelector): + def getKey(self, input): + return input[0] + + +class WindowSum(WindowFunction): + def apply(self, key, window, values, collector): + collector.collect((key, len(values))) + + +class Main(TestBase): + def __init__(self): + super(Main, self).__init__() + + def run(self): + env = self._get_execution_environment() + env.create_python_source(Generator(num_iters=constants.NUM_ITERATIONS_IN_TEST)) \ + .key_by(Selector()) \ + .time_window(seconds(1)) \ + .apply(WindowSum()) \ + .print() + + env.execute(True) + + +def main(): + Main().run() + + +if __name__ == '__main__': + main() + print("Job completed ({})\n".format(sys.argv)) http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_word_count.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_word_count.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_word_count.py new file mode 100644 index 0000000..baad7b3 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_word_count.py @@ -0,0 +1,84 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import sys +from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction +from org.apache.flink.api.java.functions import KeySelector +from org.apache.flink.streaming.python.api.environment import PythonStreamExecutionEnvironment +from org.apache.flink.streaming.api.functions.source import SourceFunction +from org.apache.flink.streaming.api.windowing.time.Time import milliseconds + + +class Generator(SourceFunction): + def __init__(self, num_iters): + self._running = True + self._num_iters = num_iters + + def run(self, ctx): + counter = 0 + while self._running and counter < self._num_iters: + ctx.collect('Hello World') + counter += 1 + + def cancel(self): + self._running = False + + +class Tokenizer(FlatMapFunction): + def flatMap(self, value, collector): + for word in value.lower().split(): + collector.collect((1, word)) + + +class Selector(KeySelector): + def getKey(self, input): + return input[1] + + +class Sum(ReduceFunction): + def reduce(self, input1, input2): + count1, word1 = input1 + count2, word2 = input2 + return (count1 + count2, word1) + + +class Main: + def __init__(self, local): + self._local = local + + def run(self): + env = PythonStreamExecutionEnvironment.get_execution_environment() + env.create_python_source(Generator(num_iters=100)) \ + .flat_map(Tokenizer()) \ + .key_by(Selector()) \ + .time_window(milliseconds(30)) \ + .reduce(Sum()) \ + .print() + + + print("Execution mode: {}".format("LOCAL" if self._local else "REMOTE")) + env.execute(self._local) + + +def main(): + local = False if len(sys.argv) > 1 and sys.argv[1] == "remote" else True + Main(local).run() + + +if __name__ == '__main__': + main() + print("Job completed ({})\n".format(sys.argv)) http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_as_text.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_as_text.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_as_text.py new file mode 100644 index 0000000..9a34d06 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_as_text.py @@ -0,0 +1,68 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction +from org.apache.flink.api.java.functions import KeySelector +from org.apache.flink.core.fs.FileSystem import WriteMode +from org.apache.flink.streaming.api.windowing.time.Time import milliseconds + +from utils import constants +from utils.python_test_base import TestBase + + +class Tokenizer(FlatMapFunction): + def flatMap(self, value, collector): + collector.collect((1, value)) + + +class Sum(ReduceFunction): + def reduce(self, input1, input2): + count1, val1 = input1 + count2, val2 = input2 + return (count1 + count2, val1) + + +class Selector(KeySelector): + def getKey(self, input): + return input[1] + + +class Main(TestBase): + def __init__(self): + super(Main, self).__init__() + + def run(self): + elements = ["aa" if iii % 2 == 0 else "bbb" for iii in range(constants.NUM_ITERATIONS_IN_TEST)] + + env = self._get_execution_environment() + env.from_collection(elements) \ + .flat_map(Tokenizer()) \ + .key_by(Selector()) \ + .time_window(milliseconds(10)) \ + .reduce(Sum()) \ + .write_as_text("/tmp/flink_write_as_text", WriteMode.OVERWRITE) + + result = env.execute("MyJob", True) + print("Job completed, job_id={}".format(result.jobID)) + + +def main(): + Main().run() + + +if __name__ == '__main__': + main() http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_to_socket.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_to_socket.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_to_socket.py new file mode 100644 index 0000000..aa322ce --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_to_socket.py @@ -0,0 +1,106 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import socket +import threading +import time +from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction +from org.apache.flink.api.java.functions import KeySelector +from org.apache.flink.streaming.api.windowing.time.Time import milliseconds +from org.apache.flink.streaming.util.serialization import SerializationSchema + +from utils import constants +from utils import utils +from utils.python_test_base import TestBase + + +class Tokenizer(FlatMapFunction): + def flatMap(self, value, collector): + collector.collect((1, value)) + + +class Sum(ReduceFunction): + def reduce(self, input1, input2): + count1, val1 = input1 + count2, val2 = input2 + return (count1 + count2, val1) + + +class Selector(KeySelector): + def getKey(self, input): + return input[1] + + +class ToStringSchema(SerializationSchema): + def serialize(self, value): + return "{}, {}|".format(value[0], value[1]) + + +class SocketStringReader(threading.Thread): + def __init__(self, host, port, expected_num_values): + threading.Thread.__init__(self) + self._host = host + self._port = port + self._expected_num_values = expected_num_values + + def run(self): + serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + serversocket.bind((self._host, self._port)) + serversocket.listen(5) + (clientsocket, address) = serversocket.accept() + while True: + msg = clientsocket.recv(1024) + if not msg: + break + for v in msg.split('|')[:-1]: + print(v) + + print("*** Done receiving ***") + clientsocket.close() + serversocket.close() + + +class Main(TestBase): + def __init__(self): + super(Main, self).__init__() + + + def run(self): + port = utils.gen_free_port() + SocketStringReader('', port, constants.NUM_ITERATIONS_IN_TEST).start() + time.sleep(0.5) + + elements = ["aa" if iii % 2 == 0 else "bbb" for iii in range(constants.NUM_ITERATIONS_IN_TEST)] + + env = self._get_execution_environment() + env.from_collection(elements) \ + .flat_map(Tokenizer()) \ + .key_by(Selector()) \ + .time_window(milliseconds(50)) \ + .reduce(Sum()) \ + .write_to_socket('localhost', port, ToStringSchema()) + + result = env.execute("MyJob", True) + print("Job completed, job_id={}".format(result.jobID)) + + +def main(): + Main().run() + + +if __name__ == '__main__': + main() http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/__init__.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/__init__.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/__init__.py new file mode 100644 index 0000000..65b48d4 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/__init__.py @@ -0,0 +1,17 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/constants.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/constants.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/constants.py new file mode 100644 index 0000000..da1278c --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/constants.py @@ -0,0 +1,21 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +NUM_ITERATIONS_IN_TEST = 100 +NUM_ELEMENTS_IN_TEST = 10 +MAX_EXECUTION_TIME_MS = 1000 http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/pygeneratorbase.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/pygeneratorbase.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/pygeneratorbase.py new file mode 100644 index 0000000..9dbda95 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/pygeneratorbase.py @@ -0,0 +1,36 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from org.apache.flink.streaming.api.functions.source import SourceFunction + + +class PyGeneratorBase(SourceFunction): + def __init__(self, num_iters=7000): + self._running = True + self._num_iters = num_iters + + def run(self, ctx): + counter = 0 + while self._running and counter < self._num_iters: + self.do(ctx) + counter += 1 + + def do(self, ctx): + pass + + def cancel(self): + self._running = False http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/python_test_base.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/python_test_base.py b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/python_test_base.py new file mode 100644 index 0000000..7baef33 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/python_test_base.py @@ -0,0 +1,35 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import sys +from org.apache.flink.api.java.utils import ParameterTool +from org.apache.flink.streaming.python.api.environment import PythonStreamExecutionEnvironment + + +class TestBase(object): + _params = ParameterTool.fromArgs(sys.argv[1:]) if len(sys.argv[1:]) > 0 else None + + def __init__(self): + pass + + def _get_execution_environment(self): + if TestBase._params: + print("Create local execution environment with provided configurations") + return PythonStreamExecutionEnvironment.create_local_execution_environment(TestBase._params.getConfiguration()) + else: + print("Get execution environment") + return PythonStreamExecutionEnvironment.get_execution_environment()
