http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/run_all_tests.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/run_all_tests.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/run_all_tests.py
new file mode 100644
index 0000000..1970918
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/run_all_tests.py
@@ -0,0 +1,75 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import sys
+from os.path import dirname, join, basename
+from glob import glob
+from org.apache.flink.runtime.client import JobExecutionException
+
+
+excluded_tests = [
+    'test_kafka09',
+]
+
+
+class Main:
+    def __init__(self):
+        pass
+
+    def run(self):
+        tests = []
+        pwd = dirname(sys.argv[0])
+        print("Working directory: {}".format(pwd))
+
+        if excluded_tests:
+            print("Excluded tests: {}\n".format(excluded_tests))
+
+        for x in glob(join(pwd, 'test_*.py')):
+            if not x.startswith('__'):
+                test_module_name = basename(x)[:-3]
+                if test_module_name not in excluded_tests:
+                    tests.append(__import__(test_module_name, globals(), 
locals()))
+
+        failed_tests = []
+        for test in tests:
+            print("Submitting job ... '{}'".format(test.__name__))
+            try:
+                test.main()
+                print("Job completed ('{}')\n".format(test.__name__))
+            except JobExecutionException as ex:
+                failed_tests.append(test.__name__)
+                print("\n{}\n{}\n{}\n".format('#'*len(ex.message), ex.message, 
'#'*len(ex.message)))
+            except:
+                failed_tests.append(test.__name__)
+                ex_type = sys.exc_info()[0]
+                print("\n{}\n{}\n{}\n".format('#'*len(ex_type), ex_type, 
'#'*len(ex_type)))
+
+        if failed_tests:
+            print("\nThe following tests were failed:")
+            for failed_test in failed_tests:
+                print("\t* " + failed_test)
+            raise Exception("\nFailed test(s): {}".format(failed_tests))
+        else:
+            print("\n*** All tests passed successfully ***")
+
+
+def main():
+    Main().run()
+
+
+if __name__ == "__main__":
+    main()

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_filter.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_filter.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_filter.py
new file mode 100644
index 0000000..c32124e
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_filter.py
@@ -0,0 +1,82 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import sys
+from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction, FilterFunction
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
+
+from utils import constants
+from utils.pygeneratorbase import PyGeneratorBase
+from utils.python_test_base import TestBase
+
+
+class Generator(PyGeneratorBase):
+    def __init__(self, num_iters):
+        super(Generator, self).__init__(num_iters)
+        self._alternator = True
+
+    def do(self, ctx):
+        ctx.collect('Hello' if self._alternator else 'World')
+        self._alternator = not self._alternator
+
+
+class Filterer(FilterFunction):
+    def filter(self, value):
+        return value == 'Hello'
+
+
+class Tokenizer(FlatMapFunction):
+    def flatMap(self, value, collector):
+        for word in value.lower().split():
+            collector.collect((1, word))
+
+
+class Sum(ReduceFunction):
+    def reduce(self, input1, input2):
+        count1, word1 = input1
+        count2, word2 = input2
+        return (count1 + count2, word1)
+
+
+class Selector(KeySelector):
+    def getKey(self, input):
+        return input[1]
+
+
+class Main(TestBase):
+    def __init__(self):
+        super(Main, self).__init__()
+
+    def run(self):
+        env = self._get_execution_environment()
+        
env.create_python_source(Generator(num_iters=constants.NUM_ITERATIONS_IN_TEST)) 
\
+            .filter(Filterer()) \
+            .flat_map(Tokenizer()) \
+            .key_by(Selector()) \
+            .time_window(milliseconds(100)) \
+            .reduce(Sum()) \
+            .print()
+
+        env.execute(True)
+
+def main():
+    Main().run()
+
+if __name__ == '__main__':
+    main()
+    print("Job completed ({})\n".format(sys.argv))

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_int.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_int.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_int.py
new file mode 100644
index 0000000..68ac3ef
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_int.py
@@ -0,0 +1,77 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import sys
+from utils import constants
+from utils.python_test_base import TestBase
+from utils.pygeneratorbase import PyGeneratorBase
+from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.streaming.api.windowing.time.Time import seconds
+from org.apache.flink.streaming.api import CheckpointingMode
+
+
+class Generator(PyGeneratorBase):
+    def __init__(self, num_iters):
+        super(Generator, self).__init__(num_iters)
+
+    def do(self, ctx):
+        ctx.collect(222)
+
+
+class Tokenizer(FlatMapFunction):
+    def flatMap(self, value, collector):
+        collector.collect((1, value))
+
+
+class Sum(ReduceFunction):
+    def reduce(self, input1, input2):
+        count1, val1 = input1
+        count2, val2 = input2
+        return (count1 + count2, val1)
+
+
+class Selector(KeySelector):
+    def getKey(self, input):
+        return input[1]
+
+
+class Main(TestBase):
+    def __init__(self):
+        super(Main, self).__init__()
+
+    def run(self):
+        env = self._get_execution_environment()
+        env.enable_checkpointing(1000, CheckpointingMode.AT_LEAST_ONCE) \
+            
.create_python_source(Generator(num_iters=constants.NUM_ITERATIONS_IN_TEST)) \
+            .flat_map(Tokenizer()) \
+            .key_by(Selector()) \
+            .time_window(seconds(1)) \
+            .reduce(Sum()) \
+            .print()
+
+        result = env.execute("MyJob", True)
+        print("Job completed, job_id={}".format(str(result.jobID)))
+
+
+def main():
+    Main().run()
+
+
+if __name__ == '__main__':
+    main()
+    print("Job completed ({})\n".format(sys.argv))

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_list_int.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_list_int.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_list_int.py
new file mode 100644
index 0000000..6f6db53
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_list_int.py
@@ -0,0 +1,76 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import sys
+from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.streaming.api.windowing.time.Time import seconds
+
+from utils import constants
+from utils.pygeneratorbase import PyGeneratorBase
+from utils.python_test_base import TestBase
+
+
+class Generator(PyGeneratorBase):
+    def __init__(self, num_iters):
+        super(Generator, self).__init__(num_iters)
+
+    def do(self, ctx):
+        ctx.collect([222, 333])
+
+
+class Tokenizer(FlatMapFunction):
+    def flatMap(self, value, collector):
+        for v in value:
+            collector.collect((1, v))
+
+
+class Sum(ReduceFunction):
+    def reduce(self, input1, input2):
+        count1, val1 = input1
+        count2, val2 = input2
+        return (count1 + count2, val1)
+
+
+class Selector(KeySelector):
+    def getKey(self, input):
+        return input[1]
+
+
+class Main(TestBase):
+    def __init__(self):
+        super(Main, self).__init__()
+
+    def run(self):
+        env = self._get_execution_environment()
+        
env.create_python_source(Generator(num_iters=constants.NUM_ITERATIONS_IN_TEST)) 
\
+            .flat_map(Tokenizer()) \
+            .key_by(Selector()) \
+            .time_window(seconds(1)) \
+            .reduce(Sum()) \
+            .print()
+
+        env.execute(True)
+
+
+def main():
+    Main().run()
+
+
+if __name__ == '__main__':
+    main()
+    print("Job completed ({})\n".format(sys.argv))

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_string.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_string.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_string.py
new file mode 100644
index 0000000..9f5e7e5
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_string.py
@@ -0,0 +1,76 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import sys
+from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
+
+from utils import constants
+from utils.pygeneratorbase import PyGeneratorBase
+from utils.python_test_base import TestBase
+
+
+class Generator(PyGeneratorBase):
+    def __init__(self, num_iters):
+        super(Generator, self).__init__(num_iters)
+
+    def do(self, ctx):
+        ctx.collect('Hello World')
+
+
+class Tokenizer(FlatMapFunction):
+    def flatMap(self, value, collector):
+        for word in value.lower().split():
+            collector.collect((1, word))
+
+
+class Selector(KeySelector):
+    def getKey(self, input):
+        return input[1]
+
+
+class Sum(ReduceFunction):
+    def reduce(self, input1, input2):
+        count1, word1 = input1
+        count2, word2 = input2
+        return (count1 + count2, word1)
+
+
+class Main(TestBase):
+    def __init__(self):
+        super(Main, self).__init__()
+
+    def run(self):
+        env = self._get_execution_environment()
+        
env.create_python_source(Generator(num_iters=constants.NUM_ITERATIONS_IN_TEST)) 
\
+            .flat_map(Tokenizer()) \
+            .key_by(Selector()) \
+            .time_window(milliseconds(50)) \
+            .reduce(Sum()) \
+            .print()
+
+        env.execute(True)
+
+
+def main():
+    Main().run()
+
+
+if __name__ == '__main__':
+    main()
+    print("Job completed ({})\n".format(sys.argv))

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_collection.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_collection.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_collection.py
new file mode 100644
index 0000000..67b1a42
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_collection.py
@@ -0,0 +1,67 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
+
+from utils import constants
+from utils.python_test_base import TestBase
+
+
+class Tokenizer(FlatMapFunction):
+    def flatMap(self, value, collector):
+        collector.collect((1, value))
+
+
+class Sum(ReduceFunction):
+    def reduce(self, input1, input2):
+        count1, val1 = input1
+        count2, val2 = input2
+        return (count1 + count2, val1)
+
+
+class Selector(KeySelector):
+    def getKey(self, input):
+        return input[1]
+
+
+class Main(TestBase):
+    def __init__(self):
+        super(Main, self).__init__()
+
+    def run(self):
+        elements = ["aa" if iii % 2 == 0 else "bbb" for iii in 
range(constants.NUM_ELEMENTS_IN_TEST)]
+
+        env = self._get_execution_environment()
+        env.from_collection(elements) \
+            .flat_map(Tokenizer()) \
+            .key_by(Selector()) \
+            .time_window(milliseconds(10)) \
+            .reduce(Sum()) \
+            .print()
+
+        result = env.execute("MyJob", True)
+        print("Job completed, job_id={}".format(result.jobID))
+
+
+def main():
+    Main().run()
+
+
+if __name__ == '__main__':
+    main()

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_elements.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_elements.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_elements.py
new file mode 100644
index 0000000..59f8cf7
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_elements.py
@@ -0,0 +1,67 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
+
+from utils import constants
+from utils.python_test_base import TestBase
+
+
+class Tokenizer(FlatMapFunction):
+    def flatMap(self, value, collector):
+        collector.collect((1, value))
+
+
+class Sum(ReduceFunction):
+    def reduce(self, input1, input2):
+        count1, val1 = input1
+        count2, val2 = input2
+        return (count1 + count2, val1)
+
+
+class Selector(KeySelector):
+    def getKey(self, input):
+        return input[1][0]
+
+
+class Main(TestBase):
+    def __init__(self):
+        super(Main, self).__init__()
+
+    def run(self):
+        elements = [("Alice", 111) if iii % 2 == 0 else ("Bob", 2222) for iii 
in range(constants.NUM_ELEMENTS_IN_TEST)]
+
+        env = self._get_execution_environment()
+        env.from_elements(*elements) \
+            .flat_map(Tokenizer()) \
+            .key_by(Selector()) \
+            .time_window(milliseconds(10)) \
+            .reduce(Sum()) \
+            .print()
+
+        result = env.execute("MyJob", True)
+        print("Job completed, job_id={}".format(result.jobID))
+
+
+def main():
+    Main().run()
+
+
+if __name__ == '__main__':
+    main()

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_iterator.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_iterator.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_iterator.py
new file mode 100644
index 0000000..9e615d7
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_iterator.py
@@ -0,0 +1,84 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from java.util import NoSuchElementException
+from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.streaming.python.util import PythonIterator
+from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
+
+from utils import constants
+from utils.python_test_base import TestBase
+
+
+class SomeIterator(PythonIterator):
+    def __init__(self, n):
+        self._iii = 0
+        self._nnn = n
+
+    def hasNext(self):
+        return self._iii < self._nnn
+
+    def next(self):
+        if self._iii < self._nnn:
+            i = self._iii
+            self._iii += 1
+            return 111 if i % 2 == 0 else 222
+        else:
+            raise NoSuchElementException()
+
+
+class Tokenizer(FlatMapFunction):
+    def flatMap(self, value, collector):
+        collector.collect((1, value))
+
+
+class Sum(ReduceFunction):
+    def reduce(self, input1, input2):
+        count1, val1 = input1
+        count2, val2 = input2
+        return (count1 + count2, val1)
+
+
+class Selector(KeySelector):
+    def getKey(self, input):
+        return input[1]
+
+
+class Main(TestBase):
+    def __init__(self):
+        super(Main, self).__init__()
+
+    def run(self):
+        env = self._get_execution_environment()
+        env.from_collection(SomeIterator(constants.NUM_ITERATIONS_IN_TEST)) \
+            .flat_map(Tokenizer()) \
+            .key_by(Selector()) \
+            .time_window(milliseconds(10)) \
+            .reduce(Sum()) \
+            .print()
+
+        result = env.execute("MyJob", True)
+        print("Job completed, job_id={}".format(result.jobID))
+
+
+def main():
+    Main().run()
+
+
+if __name__ == '__main__':
+    main()

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_generate_sequence.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_generate_sequence.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_generate_sequence.py
new file mode 100644
index 0000000..2e24464
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_generate_sequence.py
@@ -0,0 +1,65 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
+
+from utils import constants
+from utils.python_test_base import TestBase
+
+
+class Tokenizer(FlatMapFunction):
+    def flatMap(self, value, collector):
+        collector.collect((1, value))
+
+
+class Sum(ReduceFunction):
+    def reduce(self, input1, input2):
+        count1, val1 = input1
+        count2, val2 = input2
+        return (count1 + count2, val1)
+
+
+class Selector(KeySelector):
+    def getKey(self, input):
+        return 1
+
+
+class Main(TestBase):
+    def __init__(self):
+        super(Main, self).__init__()
+
+    def run(self):
+        env = self._get_execution_environment()
+        env.generate_sequence(1, constants.NUM_ITERATIONS_IN_TEST) \
+            .flat_map(Tokenizer()) \
+            .key_by(Selector()) \
+            .time_window(milliseconds(10)) \
+            .reduce(Sum()) \
+            .print()
+
+        result = env.execute("MyJob", True)
+        print("Job completed, job_id={}".format(result.jobID))
+
+
+def main():
+    Main().run()
+
+
+if __name__ == '__main__':
+    main()

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_iterations.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_iterations.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_iterations.py
new file mode 100644
index 0000000..eea8d5c
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_iterations.py
@@ -0,0 +1,69 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from org.apache.flink.api.common.functions import FilterFunction
+from org.apache.flink.api.common.functions import MapFunction
+
+from utils import constants
+from utils.python_test_base import TestBase
+
+
+class MinusOne(MapFunction):
+    def map(self, value):
+        return value - 1
+
+
+class PositiveNumber(FilterFunction):
+    def filter(self, value):
+        return value > 0
+
+
+class LessEquelToZero(FilterFunction):
+    def filter(self, value):
+        return value <= 0
+
+
+class Main(TestBase):
+    def __init__(self):
+        super(Main, self).__init__()
+
+    def run(self):
+        env = self._get_execution_environment()
+        some_integers = env.from_collection([2] * 5)
+
+        iterative_stream = 
some_integers.iterate(constants.MAX_EXECUTION_TIME_MS)
+
+        minus_one_stream = iterative_stream.map(MinusOne())
+
+        still_greater_then_zero = minus_one_stream.filter(PositiveNumber())
+
+        iterative_stream.close_with(still_greater_then_zero)
+
+        less_then_zero = minus_one_stream.filter(LessEquelToZero())
+
+        less_then_zero.print()
+
+        result = env.execute("MyJob", True)
+        print("Job completed, job_id={}".format(result.jobID))
+
+
+def main():
+    Main().run()
+
+
+if __name__ == '__main__':
+    main()

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_kafka09.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_kafka09.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_kafka09.py
new file mode 100644
index 0000000..1be7c17
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_kafka09.py
@@ -0,0 +1,157 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import sys
+import threading
+from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.api.java.utils import ParameterTool
+from org.apache.flink.streaming.python.connectors import 
PythonFlinkKafkaProducer09, PythonFlinkKafkaConsumer09
+from org.apache.flink.streaming.api.functions.source import SourceFunction
+from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
+from org.apache.flink.streaming.util.serialization import DeserializationSchema
+from org.apache.flink.streaming.util.serialization import SerializationSchema
+
+from utils import constants
+from utils import utils
+from utils.python_test_base import TestBase
+
+KAFKA_DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092"
+
+
+class StringGenerator(SourceFunction):
+    def __init__(self, msg, end_msg, num_iters=7000):
+        self._running = True
+        self._msg = msg
+        self._end_msg = end_msg
+        self._num_iters = num_iters
+    def run(self, ctx):
+        counter = 0
+        while self._running and counter < self._num_iters - 1:
+            counter += 1
+            ctx.collect(self._msg)
+        ctx.collect(self._end_msg)
+    def cancel(self):
+        self._running = False
+
+
+class ToStringSchema(SerializationSchema):
+    def serialize(self, value):
+        return str(value)
+
+
+class KafkaStringProducer(threading.Thread, TestBase):
+    def __init__(self, bootstrap_server, msg, end_msg, num_iters):
+        threading.Thread.__init__(self)
+        self._bootstrap_server = bootstrap_server
+        self._msg = msg
+        self._end_msg = end_msg
+        # if self._msg[-1] != '\n': self._msg += '\n'
+        # if self._end_msg[-1] != '\n': self._end_msg += '\n'
+        self._num_iters = num_iters
+
+    def run(self):
+        env = self._get_execution_environment()
+
+        stream = env.create_python_source(StringGenerator(self._msg, 
self._end_msg, num_iters=100))
+
+        producer = PythonFlinkKafkaProducer09(KAFKA_DEFAULT_BOOTSTRAP_SERVERS, 
"kafka09-test", ToStringSchema())
+        producer.set_log_failures_only(False);   # "False" by default
+        producer.set_flush_on_checkpoint(True);  # "True" by default
+
+        stream.add_sink(producer)
+
+        result = env.execute("Kafka09 producer test")
+        print("Kafka09 producer job completed, job_id={}".format(result.jobID))
+
+
+class StringDeserializationSchema(DeserializationSchema):
+    def deserialize(self, message):
+        return ''.join(map(chr,message))
+
+    def isEndOfStream(self, element):
+        return str(element) == "quit"
+
+
+class Tokenizer(FlatMapFunction):
+    def flatMap(self, value, collector):
+        for word in value.lower().split():
+            collector.collect((1, word))
+
+
+class Sum(ReduceFunction):
+    def reduce(self, input1, input2):
+        count1, val1 = input1
+        count2, val2 = input2
+        return (count1 + count2, val1)
+
+
+class Selector(KeySelector):
+    def getKey(self, input):
+        return input[1]
+
+
+class KafkaStringConsumer(threading.Thread, TestBase):
+    def __init__(self, bootstrap_server):
+        threading.Thread.__init__(self)
+        self._bootstrap_server = bootstrap_server
+
+    def run(self):
+        parameterTool = ParameterTool.fromArgs(sys.argv[1:])
+        props = parameterTool.getProperties()
+        props.setProperty("bootstrap.servers", self._bootstrap_server)
+
+        consumer = PythonFlinkKafkaConsumer09("kafka09-test", 
StringDeserializationSchema(), props)
+
+        env = self._get_execution_environment()
+        env.add_java_source(consumer) \
+            .flat_map(Tokenizer()) \
+            .key_by(Selector()) \
+            .time_window(milliseconds(100)) \
+            .reduce(Sum()) \
+            .print()
+
+        result = env.execute("Python consumer kafka09 test", True)
+        print("Kafka09 consumer job completed, job_id={}".format(result.jobID))
+
+
+class Main(TestBase):
+    def __init__(self):
+        super(Main, self).__init__()
+
+    def run(self):
+        host, port = KAFKA_DEFAULT_BOOTSTRAP_SERVERS.split(":")
+        if not utils.is_reachable(host, int(port)):
+            print("Kafka server is not reachable: 
[{}]".format(KAFKA_DEFAULT_BOOTSTRAP_SERVERS))
+            return
+
+        kafka_p = KafkaStringProducer(KAFKA_DEFAULT_BOOTSTRAP_SERVERS, "Hello 
World", "quit", constants.NUM_ITERATIONS_IN_TEST)
+        kafka_c = KafkaStringConsumer(KAFKA_DEFAULT_BOOTSTRAP_SERVERS)
+
+        kafka_p.start()
+        kafka_c.start()
+
+        kafka_p.join()
+        kafka_c.join()
+
+
+def main():
+    Main().run()
+
+
+if __name__ == '__main__':
+    main()

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_keyed_stream_reduce.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_keyed_stream_reduce.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_keyed_stream_reduce.py
new file mode 100644
index 0000000..07a60c0
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_keyed_stream_reduce.py
@@ -0,0 +1,69 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import sys
+from org.apache.flink.api.common.functions import ReduceFunction, 
FlatMapFunction
+from org.apache.flink.api.java.functions import KeySelector
+
+from utils import constants
+from utils.python_test_base import TestBase
+
+
+class Tokenizer(FlatMapFunction):
+    def flatMap(self, value, collector):
+        for v in value:
+            collector.collect((1, v))
+
+
+class Selector(KeySelector):
+    def getKey(self, input):
+        return input[1]
+
+
+class Sum(ReduceFunction):
+    def reduce(self, input1, input2):
+        count1, val1 = input1
+        count2, val2 = input2
+        return (count1 + count2, val1)
+
+
+class Main(TestBase):
+    def __init__(self):
+        super(Main, self).__init__()
+
+    def run(self):
+        elements = [(1, 222 if x % 2 == 0 else 333) for x in 
range(constants.NUM_ELEMENTS_IN_TEST)]
+
+        env = self._get_execution_environment()
+        env.set_parallelism(2) \
+            .from_elements(elements) \
+            .flat_map(Tokenizer()) \
+            .key_by(Selector()) \
+            .reduce(Sum()) \
+            .print()
+
+        result = env.execute("MyJob", True)
+        print("Job completed, job_id={}".format(str(result.jobID)))
+
+
+def main():
+    Main().run()
+
+
+if __name__ == '__main__':
+    main()
+    print("Job completed ({})\n".format(sys.argv))

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map.py
new file mode 100644
index 0000000..43984d0
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map.py
@@ -0,0 +1,83 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import sys
+from utils.python_test_base import TestBase
+from utils.pygeneratorbase import PyGeneratorBase
+from org.apache.flink.api.common.functions import MapFunction, 
FlatMapFunction, ReduceFunction
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
+
+
+class Generator(PyGeneratorBase):
+    def __init__(self, num_iters):
+        super(Generator, self).__init__(num_iters)
+
+    def do(self, ctx):
+        ctx.collect(222)
+
+class DummyTupple(MapFunction):
+    def map(self, value):
+        return (value, value)
+
+class MinusOne(MapFunction):
+    def map(self, value):
+        return value[0] - 1
+
+class Tokenizer(FlatMapFunction):
+    def flatMap(self, value, collector):
+        collector.collect((1, value))
+
+
+class Sum(ReduceFunction):
+    def reduce(self, input1, input2):
+        count1, val1 = input1
+        count2, val2 = input2
+        return (count1 + count2, val1)
+
+
+class Selector(KeySelector):
+    def getKey(self, input):
+        return input[1]
+
+
+class Main(TestBase):
+    def __init__(self):
+        super(Main, self).__init__()
+
+    def run(self):
+        env = self._get_execution_environment()
+        env.from_collection([3] * 5) \
+            .map(DummyTupple()) \
+            .map(MinusOne()) \
+            .flat_map(Tokenizer()) \
+            .key_by(Selector()) \
+            .time_window(milliseconds(5)) \
+            .reduce(Sum()) \
+            .print()
+
+        result = env.execute("MyJob", True)
+        print("Job completed, job_id={}".format(str(result.jobID)))
+
+
+def main():
+    Main().run()
+
+
+if __name__ == '__main__':
+    main()
+    print("Job completed ({})\n".format(sys.argv))

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map_int.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map_int.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map_int.py
new file mode 100644
index 0000000..44e7bfc
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map_int.py
@@ -0,0 +1,77 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import sys
+from org.apache.flink.api.common.functions import MapFunction, ReduceFunction
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.streaming.api.windowing.time.Time import seconds
+
+from utils import constants
+from utils.pygeneratorbase import PyGeneratorBase
+from utils.python_test_base import TestBase
+
+
+class Generator(PyGeneratorBase):
+    def __init__(self, num_iters):
+        super(Generator, self).__init__(num_iters)
+        self._alternator = True
+
+    def do(self, ctx):
+        ctx.collect(10 if self._alternator else -10)
+        self._alternator = not self._alternator
+
+
+class Tokenizer(MapFunction):
+    def map(self, value):
+        return (1, 2 * value)
+
+
+class Selector(KeySelector):
+    def getKey(self, input):
+        return input[1]
+
+
+class Sum(ReduceFunction):
+    def reduce(self, input1, input2):
+        count1, val1 = input1
+        count2, val2 = input2
+        return (count1 + count2, val1)
+
+
+class Main(TestBase):
+    def __init__(self):
+        super(Main, self).__init__()
+
+    def run(self):
+        env = self._get_execution_environment()
+        
env.create_python_source(Generator(num_iters=constants.NUM_ITERATIONS_IN_TEST)) 
\
+            .map(Tokenizer()) \
+            .key_by(Selector()) \
+            .time_window(seconds(1)) \
+            .reduce(Sum()) \
+            .print()
+
+        env.execute(True)
+
+
+def main():
+    Main().run()
+
+
+if __name__ == '__main__':
+    main()
+    print("Job completed ({})\n".format(sys.argv))

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_read_text_file.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_read_text_file.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_read_text_file.py
new file mode 100644
index 0000000..36c86c9
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_read_text_file.py
@@ -0,0 +1,82 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+import re
+import sys
+import uuid
+from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
+
+from utils import constants
+from utils.python_test_base import TestBase
+
+
+class Tokenizer(FlatMapFunction):
+    def flatMap(self, value, collector):
+        for word in re.sub(r'\s', '', value).split(','):
+            collector.collect((1, word))
+
+
+class Sum(ReduceFunction):
+    def reduce(self, input1, input2):
+        count1, word1 = input1
+        count2, word2 = input2
+        return (count1 + count2, word1)
+
+
+class Selector(KeySelector):
+    def getKey(self, input):
+        return input[1]
+
+
+def generate_tmp_text_file(num_lines=100):
+    tmp_f = open("/tmp/{}".format(uuid.uuid4().get_hex()), 'w')
+    for iii in range(num_lines):
+        tmp_f.write('111, 222, 333, 444, 555, 666, 777\n')
+    return tmp_f
+
+
+class Main(TestBase):
+    def __init__(self):
+        super(Main, self).__init__()
+
+    def run(self):
+        tmp_f = generate_tmp_text_file(constants.NUM_ELEMENTS_IN_TEST)
+        try:
+            env = self._get_execution_environment()
+            env.read_text_file(tmp_f.name) \
+                .flat_map(Tokenizer()) \
+                .key_by(Selector()) \
+                .time_window(milliseconds(100)) \
+                .reduce(Sum()) \
+                .print()
+
+            env.execute(True)
+        finally:
+            tmp_f.close()
+            os.unlink(tmp_f.name)
+
+
+def main():
+    Main().run()
+
+
+if __name__ == '__main__':
+    main()
+    print("Job completed ({})\n".format(sys.argv))

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_socket_text_stream.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_socket_text_stream.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_socket_text_stream.py
new file mode 100644
index 0000000..726d69f
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_socket_text_stream.py
@@ -0,0 +1,95 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import socket
+import sys
+import threading
+import time
+from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.streaming.api.windowing.time.Time import seconds
+
+from utils import constants
+from utils import utils
+from utils.python_test_base import TestBase
+
+
+class SocketStringGenerator(threading.Thread):
+    def __init__(self, host, port, msg, num_iters):
+        threading.Thread.__init__(self)
+        self._host = host
+        self._port = port
+        self._msg = msg
+        if self._msg[-1] != '\n':
+            self._msg += '\n'
+        self._num_iters = num_iters
+
+    def run(self):
+        serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        serversocket.bind((self._host, self._port))
+        serversocket.listen(5)
+        (clientsocket, address) = serversocket.accept()
+        for iii in range(self._num_iters):
+            clientsocket.send(self._msg)
+        clientsocket.close()
+        serversocket.close()
+
+
+class Tokenizer(FlatMapFunction):
+    def flatMap(self, value, collector):
+        for word in value.lower().split():
+            collector.collect((1, word))
+
+
+class Sum(ReduceFunction):
+    def reduce(self, input1, input2):
+        count1, word1 = input1
+        count2, word2 = input2
+        return (count1 + count2, word1)
+
+
+class Selector(KeySelector):
+    def getKey(self, input):
+        return input[1]
+
+class Main(TestBase):
+    def __init__(self):
+        super(Main, self).__init__()
+
+    def run(self):
+        f_port = utils.gen_free_port()
+        SocketStringGenerator(host='', port=f_port, msg='Hello World', 
num_iters=constants.NUM_ITERATIONS_IN_TEST).start()
+        time.sleep(0.5)
+
+        env = self._get_execution_environment()
+        env.socket_text_stream('localhost', f_port) \
+            .flat_map(Tokenizer()) \
+            .key_by(Selector()) \
+            .time_window(seconds(1)) \
+            .reduce(Sum()) \
+            .print()
+
+        env.execute(True)
+
+
+def main():
+    Main().run()
+
+
+if __name__ == '__main__':
+    main()
+    print("Job completed ({})\n".format(sys.argv))

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_split_select.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_split_select.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_split_select.py
new file mode 100644
index 0000000..9052652
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_split_select.py
@@ -0,0 +1,80 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.streaming.api.collector.selector import OutputSelector
+from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
+
+from utils import constants
+from utils.python_test_base import TestBase
+
+
+class StreamSelector(OutputSelector):
+    def select(self, value):
+        return ['lower_stream'] if value < constants.NUM_ITERATIONS_IN_TEST / 
2 else ['upper_stream']
+
+
+class Tokenizer(FlatMapFunction):
+    def flatMap(self, value, collector):
+        collector.collect((1, value))
+
+
+class Sum(ReduceFunction):
+    def reduce(self, input1, input2):
+        count1, val1 = input1
+        count2, val2 = input2
+        return (count1 + count2, val1)
+
+
+class Selector(KeySelector):
+    def getKey(self, input):
+        return 1
+
+class Main(TestBase):
+    def __init__(self):
+        super(Main, self).__init__()
+
+    def run(self):
+        env = self._get_execution_environment()
+
+        split_window = env.generate_sequence(1, 
constants.NUM_ITERATIONS_IN_TEST).split(StreamSelector())
+
+        split_window.select('lower_stream') \
+            .flat_map(Tokenizer()) \
+            .key_by(Selector()) \
+            .time_window(milliseconds(10)) \
+            .reduce(Sum()) \
+            .print()
+
+        split_window.select('upper_stream') \
+            .flat_map(Tokenizer()) \
+            .key_by(Selector()) \
+            .time_window(milliseconds(10)) \
+            .reduce(Sum()) \
+            .print()
+
+        result = env.execute("MyJob", True)
+        print("Job completed, job_id={}".format(result.jobID))
+
+
+def main():
+    Main().run()
+
+
+if __name__ == '__main__':
+    main()

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_union.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_union.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_union.py
new file mode 100644
index 0000000..d70f87c
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_union.py
@@ -0,0 +1,77 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
+
+from utils import constants
+from utils.pygeneratorbase import PyGeneratorBase
+from utils.python_test_base import TestBase
+
+
+class Generator(PyGeneratorBase):
+    def __init__(self, msg, num_iters):
+        super(Generator, self).__init__(num_iters)
+        self._msg = msg
+    def do(self, ctx):
+        ctx.collect(self._msg)
+
+
+class Tokenizer(FlatMapFunction):
+    def flatMap(self, value, collector):
+        collector.collect((1, value))
+
+
+class Sum(ReduceFunction):
+    def reduce(self, input1, input2):
+        count1, val1 = input1
+        count2, val2 = input2
+        return (count1 + count2, val1)
+
+
+class Selector(KeySelector):
+    def getKey(self, input):
+        return 1
+
+class Main(TestBase):
+    def __init__(self):
+        super(Main, self).__init__()
+
+    def run(self):
+        env = self._get_execution_environment()
+        seq1 = env.create_python_source(Generator(msg='Hello', 
num_iters=constants.NUM_ITERATIONS_IN_TEST))
+        seq2 = env.create_python_source(Generator(msg='World', 
num_iters=constants.NUM_ITERATIONS_IN_TEST))
+        seq3 = env.create_python_source(Generator(msg='Happy', 
num_iters=constants.NUM_ITERATIONS_IN_TEST))
+
+        seq1.union(seq2, seq3) \
+            .flat_map(Tokenizer()) \
+            .key_by(Selector()) \
+            .time_window(milliseconds(10)) \
+            .reduce(Sum()) \
+            .print()
+
+        result = env.execute("My python union stream test", True)
+        print("Job completed, job_id={}".format(result.jobID))
+
+
+def main():
+    Main().run()
+
+
+if __name__ == '__main__':
+    main()

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_user_type.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_user_type.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_user_type.py
new file mode 100644
index 0000000..3ace000
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_user_type.py
@@ -0,0 +1,86 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import sys
+from org.apache.flink.api.common.functions import MapFunction, ReduceFunction
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
+
+from utils import constants
+from utils.pygeneratorbase import PyGeneratorBase
+from utils.python_test_base import TestBase
+
+
+class Person:
+    def __init__(self, name, age):
+        self.name = name
+        self.age = age
+    def __repr__(self):
+        return "{} (age: {})".format(self.name, self.age)
+
+
+class Generator(PyGeneratorBase):
+    def __init__(self, num_iters):
+        super(Generator, self).__init__(num_iters)
+        self._alternator = True
+
+    def do(self, ctx):
+        person = Person("Avi", 47) if self._alternator else Person("Bob", 33)
+        ctx.collect(person)
+        self._alternator = not self._alternator
+
+
+class Tokenizer(MapFunction):
+    def map(self, value):
+        return (1, value)
+
+
+class Selector(KeySelector):
+    def getKey(self, input):
+        return input[1].name
+
+
+class Sum(ReduceFunction):
+    def reduce(self, input1, input2):
+        count1, val1 = input1
+        count2, val2 = input2
+        return (count1 + count2, val1)
+
+
+class Main(TestBase):
+    def __init__(self):
+        super(Main, self).__init__()
+
+    def run(self):
+        env = self._get_execution_environment()
+        
env.create_python_source(Generator(num_iters=constants.NUM_ITERATIONS_IN_TEST)) 
\
+            .map(Tokenizer()) \
+            .key_by(Selector()) \
+            .time_window(milliseconds(30)) \
+            .reduce(Sum()) \
+            .print()
+
+        env.execute(True)
+
+
+def main():
+    Main().run()
+
+
+if __name__ == '__main__':
+    main()
+    print("Job completed ({})\n".format(sys.argv))

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_window_apply.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_window_apply.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_window_apply.py
new file mode 100644
index 0000000..8a651df
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_window_apply.py
@@ -0,0 +1,72 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import sys
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.streaming.api.functions.windowing import WindowFunction
+from org.apache.flink.streaming.api.windowing.time.Time import seconds
+
+from utils import constants
+from utils.pygeneratorbase import PyGeneratorBase
+from utils.python_test_base import TestBase
+
+
+class Generator(PyGeneratorBase):
+    def __init__(self, num_iters):
+        super(Generator, self).__init__(num_iters)
+        self._alternator = True
+
+    def do(self, ctx):
+        if self._alternator:
+            ctx.collect(('key1', 'Any specific text here'))
+        else:
+            ctx.collect(('key2', 'Any other specific text here'))
+        self._alternator = not self._alternator
+
+
+class Selector(KeySelector):
+    def getKey(self, input):
+        return input[0]
+
+
+class WindowSum(WindowFunction):
+    def apply(self, key, window, values, collector):
+        collector.collect((key, len(values)))
+
+
+class Main(TestBase):
+    def __init__(self):
+        super(Main, self).__init__()
+
+    def run(self):
+        env = self._get_execution_environment()
+        
env.create_python_source(Generator(num_iters=constants.NUM_ITERATIONS_IN_TEST)) 
\
+            .key_by(Selector()) \
+            .time_window(seconds(1)) \
+            .apply(WindowSum()) \
+            .print()
+
+        env.execute(True)
+
+
+def main():
+    Main().run()
+
+
+if __name__ == '__main__':
+    main()
+    print("Job completed ({})\n".format(sys.argv))

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_word_count.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_word_count.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_word_count.py
new file mode 100644
index 0000000..baad7b3
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_word_count.py
@@ -0,0 +1,84 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import sys
+from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.streaming.python.api.environment import 
PythonStreamExecutionEnvironment
+from org.apache.flink.streaming.api.functions.source import SourceFunction
+from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
+
+
+class Generator(SourceFunction):
+    def __init__(self, num_iters):
+        self._running = True
+        self._num_iters = num_iters
+
+    def run(self, ctx):
+        counter = 0
+        while self._running and counter < self._num_iters:
+            ctx.collect('Hello World')
+            counter += 1
+
+    def cancel(self):
+        self._running = False
+
+
+class Tokenizer(FlatMapFunction):
+    def flatMap(self, value, collector):
+        for word in value.lower().split():
+            collector.collect((1, word))
+
+
+class Selector(KeySelector):
+    def getKey(self, input):
+        return input[1]
+
+
+class Sum(ReduceFunction):
+    def reduce(self, input1, input2):
+        count1, word1 = input1
+        count2, word2 = input2
+        return (count1 + count2, word1)
+
+
+class Main:
+    def __init__(self, local):
+        self._local = local
+
+    def run(self):
+        env = PythonStreamExecutionEnvironment.get_execution_environment()
+        env.create_python_source(Generator(num_iters=100)) \
+            .flat_map(Tokenizer()) \
+            .key_by(Selector()) \
+            .time_window(milliseconds(30)) \
+            .reduce(Sum()) \
+            .print()
+
+
+        print("Execution mode: {}".format("LOCAL" if self._local else 
"REMOTE"))
+        env.execute(self._local)
+
+
+def main():
+    local = False if len(sys.argv) > 1 and sys.argv[1] == "remote" else True
+    Main(local).run()
+
+
+if __name__ == '__main__':
+    main()
+    print("Job completed ({})\n".format(sys.argv))

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_as_text.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_as_text.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_as_text.py
new file mode 100644
index 0000000..9a34d06
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_as_text.py
@@ -0,0 +1,68 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.core.fs.FileSystem import WriteMode
+from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
+
+from utils import constants
+from utils.python_test_base import TestBase
+
+
+class Tokenizer(FlatMapFunction):
+    def flatMap(self, value, collector):
+        collector.collect((1, value))
+
+
+class Sum(ReduceFunction):
+    def reduce(self, input1, input2):
+        count1, val1 = input1
+        count2, val2 = input2
+        return (count1 + count2, val1)
+
+
+class Selector(KeySelector):
+    def getKey(self, input):
+        return input[1]
+
+
+class Main(TestBase):
+    def __init__(self):
+        super(Main, self).__init__()
+
+    def run(self):
+        elements = ["aa" if iii % 2 == 0 else "bbb" for iii in 
range(constants.NUM_ITERATIONS_IN_TEST)]
+
+        env = self._get_execution_environment()
+        env.from_collection(elements) \
+            .flat_map(Tokenizer()) \
+            .key_by(Selector()) \
+            .time_window(milliseconds(10)) \
+            .reduce(Sum()) \
+            .write_as_text("/tmp/flink_write_as_text", WriteMode.OVERWRITE)
+
+        result = env.execute("MyJob", True)
+        print("Job completed, job_id={}".format(result.jobID))
+
+
+def main():
+    Main().run()
+
+
+if __name__ == '__main__':
+    main()

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_to_socket.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_to_socket.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_to_socket.py
new file mode 100644
index 0000000..aa322ce
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_to_socket.py
@@ -0,0 +1,106 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import socket
+import threading
+import time
+from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
+from org.apache.flink.streaming.util.serialization import SerializationSchema
+
+from utils import constants
+from utils import utils
+from utils.python_test_base import TestBase
+
+
+class Tokenizer(FlatMapFunction):
+    def flatMap(self, value, collector):
+        collector.collect((1, value))
+
+
+class Sum(ReduceFunction):
+    def reduce(self, input1, input2):
+        count1, val1 = input1
+        count2, val2 = input2
+        return (count1 + count2, val1)
+
+
+class Selector(KeySelector):
+    def getKey(self, input):
+        return input[1]
+
+
+class ToStringSchema(SerializationSchema):
+    def serialize(self, value):
+        return "{}, {}|".format(value[0], value[1])
+
+
+class SocketStringReader(threading.Thread):
+    def __init__(self, host, port, expected_num_values):
+        threading.Thread.__init__(self)
+        self._host = host
+        self._port = port
+        self._expected_num_values = expected_num_values
+
+    def run(self):
+        serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        serversocket.bind((self._host, self._port))
+        serversocket.listen(5)
+        (clientsocket, address) = serversocket.accept()
+        while True:
+            msg = clientsocket.recv(1024)
+            if not msg:
+                break
+            for v in msg.split('|')[:-1]:
+                print(v)
+
+        print("*** Done receiving ***")
+        clientsocket.close()
+        serversocket.close()
+
+
+class Main(TestBase):
+    def __init__(self):
+        super(Main, self).__init__()
+
+
+    def run(self):
+        port = utils.gen_free_port()
+        SocketStringReader('', port, constants.NUM_ITERATIONS_IN_TEST).start()
+        time.sleep(0.5)
+
+        elements = ["aa" if iii % 2 == 0 else "bbb" for iii in 
range(constants.NUM_ITERATIONS_IN_TEST)]
+
+        env = self._get_execution_environment()
+        env.from_collection(elements) \
+            .flat_map(Tokenizer()) \
+            .key_by(Selector()) \
+            .time_window(milliseconds(50)) \
+            .reduce(Sum()) \
+            .write_to_socket('localhost', port, ToStringSchema())
+
+        result = env.execute("MyJob", True)
+        print("Job completed, job_id={}".format(result.jobID))
+
+
+def main():
+    Main().run()
+
+
+if __name__ == '__main__':
+    main()

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/__init__.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/__init__.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/__init__.py
new file mode 100644
index 0000000..65b48d4
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/constants.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/constants.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/constants.py
new file mode 100644
index 0000000..da1278c
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/constants.py
@@ -0,0 +1,21 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+NUM_ITERATIONS_IN_TEST = 100
+NUM_ELEMENTS_IN_TEST = 10
+MAX_EXECUTION_TIME_MS = 1000

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/pygeneratorbase.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/pygeneratorbase.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/pygeneratorbase.py
new file mode 100644
index 0000000..9dbda95
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/pygeneratorbase.py
@@ -0,0 +1,36 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from org.apache.flink.streaming.api.functions.source import SourceFunction
+
+
+class PyGeneratorBase(SourceFunction):
+    def __init__(self, num_iters=7000):
+        self._running = True
+        self._num_iters = num_iters
+
+    def run(self, ctx):
+        counter = 0
+        while self._running and counter < self._num_iters:
+            self.do(ctx)
+            counter += 1
+
+    def do(self, ctx):
+        pass
+
+    def cancel(self):
+        self._running = False

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/python_test_base.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/python_test_base.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/python_test_base.py
new file mode 100644
index 0000000..7baef33
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/python_test_base.py
@@ -0,0 +1,35 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import sys
+from org.apache.flink.api.java.utils import ParameterTool
+from org.apache.flink.streaming.python.api.environment import 
PythonStreamExecutionEnvironment
+
+
+class TestBase(object):
+    _params = ParameterTool.fromArgs(sys.argv[1:]) if len(sys.argv[1:]) > 0 
else None
+
+    def __init__(self):
+        pass
+
+    def _get_execution_environment(self):
+        if TestBase._params:
+            print("Create local execution environment with provided 
configurations")
+            return 
PythonStreamExecutionEnvironment.create_local_execution_environment(TestBase._params.getConfiguration())
+        else:
+            print("Get execution environment")
+            return PythonStreamExecutionEnvironment.get_execution_environment()

Reply via email to