http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java
index 2c8a257..c173389 100644
--- 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java
@@ -15,9 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.python.util.serialization;
 
 import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoException;
 import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
@@ -26,34 +28,31 @@ import org.python.core.PyObject;
 import java.io.IOException;
 
 /**
- * A serializer implementation for PyObject class type. Is is used by the Kryo 
serialization
- * framework. {@see https://github.com/EsotericSoftware/kryo#serializers}
+ * A {@link Serializer} implementation for {@link PyObject} class type.
  */
 public class PyObjectSerializer extends Serializer<PyObject> {
 
-       public void write (Kryo kryo, Output output, PyObject po) {
+       public void write(Kryo kryo, Output output, PyObject po) {
                try {
                        byte[] serPo = SerializationUtils.serializeObject(po);
                        output.writeInt(serPo.length);
                        output.write(serPo);
                } catch (IOException e) {
-                       e.printStackTrace();
+                       throw new KryoException("Failed to serialize object.", 
e);
                }
        }
 
-       public PyObject read (Kryo kryo, Input input, Class<PyObject> type) {
+       public PyObject read(Kryo kryo, Input input, Class<PyObject> type) {
                int len = input.readInt();
-               byte[] serPo = new byte[len];
-               input.read(serPo);
-               PyObject po = null;
+               byte[] serPo = input.readBytes(len);
                try {
-                       po = (PyObject) 
SerializationUtils.deserializeObject(serPo);
+                       return (PyObject) 
SerializationUtils.deserializeObject(serPo);
                } catch (IOException e) {
-                       e.printStackTrace();
+                       throw new KryoException("Failed to deserialize 
object.", e);
                } catch (ClassNotFoundException e) {
-                       e.printStackTrace();
+                       // this should only be possible if jython isn't on the 
class-path
+                       throw new KryoException("Failed to deserialize 
object.", e);
                }
-               return po;
        }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyStringSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyStringSerializer.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyStringSerializer.java
new file mode 100644
index 0000000..6988428
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyStringSerializer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.python.util.serialization;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.python.core.PyString;
+
+/**
+ * A {@link Serializer} implementation for {@link PyString} class type.
+ */
+public class PyStringSerializer extends Serializer<PyString> {
+       @Override
+       public void write(Kryo kryo, Output output, PyString object) {
+               output.writeString(object.asString());
+       }
+
+       @Override
+       public PyString read(Kryo kryo, Input input, Class<PyString> type) {
+               return new PyString(input.readString());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonDeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonDeserializationSchema.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonDeserializationSchema.java
deleted file mode 100644
index a51db89..0000000
--- 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonDeserializationSchema.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.python.util.serialization;
-
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-import java.io.IOException;
-
-/**
- * A Python deserialization schema, which implements {@link 
DeserializationSchema}. It converts a
- * serialized form of {@code PyObject} into its Java Object representation.
- */
-public class PythonDeserializationSchema implements 
DeserializationSchema<Object> {
-       private static final long serialVersionUID = -9180596504893036458L;
-       private final TypeInformation<Object> resultType = 
TypeInformation.of(new TypeHint<Object>(){});
-
-       private final byte[] serSchema;
-       private transient DeserializationSchema<Object> schema;
-
-       public PythonDeserializationSchema(DeserializationSchema<Object> 
schema) throws IOException {
-               this.serSchema = SerializationUtils.serializeObject(schema);
-       }
-
-       @SuppressWarnings("unchecked")
-       public Object deserialize(byte[] message) throws IOException {
-               if (this.schema == null) {
-                       try {
-                               this.schema = (DeserializationSchema<Object>) 
SerializationUtils.deserializeObject(this.serSchema);
-                       } catch (ClassNotFoundException e) {
-                               e.printStackTrace();
-                       }
-               }
-               return this.schema.deserialize(message);
-       }
-
-       @Override
-       public boolean isEndOfStream(Object nextElement) {
-               return this.schema.isEndOfStream(nextElement);
-       }
-
-       @Override
-       public TypeInformation<Object> getProducedType() {
-               return resultType;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonSerializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonSerializationSchema.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonSerializationSchema.java
index a2dddaf..30f61c0 100644
--- 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonSerializationSchema.java
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonSerializationSchema.java
@@ -15,16 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.python.util.serialization;
 
-import java.io.IOException;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.util.FlinkRuntimeException;
 
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.python.core.PyObject;
 
+import java.io.IOException;
+
 /**
- * A Python serialization schema, which implements {@link 
SerializationSchema}. It converts
- * a {@code PyObject} into its serialized form.
+ * A {@link SerializationSchema} for {@link PyObject}s.
  */
 public class PythonSerializationSchema implements 
SerializationSchema<PyObject> {
        private static final long serialVersionUID = -9170596504893036458L;
@@ -41,11 +43,9 @@ public class PythonSerializationSchema implements 
SerializationSchema<PyObject>
        public byte[] serialize(PyObject element) {
                if (this.schema == null) {
                        try {
-                               this.schema = 
(SerializationSchema<PyObject>)SerializationUtils.deserializeObject(this.serSchema);
-                       } catch (IOException e) {
-                               e.printStackTrace();
-                       } catch (ClassNotFoundException e) {
-                               e.printStackTrace();
+                               this.schema = 
SerializationUtils.deserializeObject(this.serSchema);
+                       } catch (Exception e) {
+                               throw new FlinkRuntimeException("Schema could 
not be deserialized.", e);
                        }
                }
                return this.schema.serialize(element);

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/SerializationUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/SerializationUtils.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/SerializationUtils.java
index b7fc2a3..eaf11ea 100644
--- 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/SerializationUtils.java
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/SerializationUtils.java
@@ -15,15 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.python.util.serialization;
 
-import 
org.apache.flink.streaming.python.api.datastream.PythonObjectInputStream2;
+import org.python.util.PythonObjectInputStream;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.io.ObjectStreamClass;
 
 /**
  * A generic serialization utility, which turns java objects into their 
serialization form as
@@ -38,9 +41,52 @@ public class SerializationUtils {
                }
        }
 
-       public static Object deserializeObject(byte[] bytes) throws 
IOException, ClassNotFoundException {
-               try (ByteArrayInputStream bais = new 
ByteArrayInputStream(bytes); ObjectInputStream ois = new 
PythonObjectInputStream2(bais)) {
-                       return ois.readObject();
+       @SuppressWarnings("unchecked")
+       public static <X> X deserializeObject(byte[] bytes) throws IOException, 
ClassNotFoundException {
+               try (ByteArrayInputStream bais = new 
ByteArrayInputStream(bytes); ObjectInputStream ois = new 
SerialVersionOverridingPythonObjectInputStream(bais)) {
+                       return (X) ois.readObject();
+               }
+       }
+
+       /**
+        * A {@link PythonObjectInputStream} that ignores SerialVersionUID 
mismatches.
+        *
+        * <p>Classes generated by Jython have a non-deterministic 
serialVersionUID, which can prevents deserialization of
+        * user-defined python function in a different JVM.
+        *
+        * <p>The serialVersionUID is calculated as a 64-bit hash of the class 
name, interface class names, methods, and fields.
+        * If a Python class inherits from a Java class, as in the case of 
Python UDFs, then a proxy wrapper class is created.
+        * Its name isconstructed using the following pattern:
+        * <b>{@code 
org.python.proxies.<module-name>$<UDF-class-name>$<number>}</b>.
+        * The {@code <number>} part is increased by one in runtime, for every 
job submission. This results in different IDs for
+        * each run for the same Python class.
+        */
+       private static class SerialVersionOverridingPythonObjectInputStream 
extends PythonObjectInputStream {
+
+               private 
SerialVersionOverridingPythonObjectInputStream(InputStream in) throws 
IOException {
+                       super(in);
+               }
+
+               protected ObjectStreamClass readClassDescriptor() throws 
ClassNotFoundException, IOException {
+                       ObjectStreamClass readClassDescriptor = 
super.readClassDescriptor(); // initially streams descriptor
+
+                       Class<?> localClass;
+                       try {
+                               localClass = resolveClass(readClassDescriptor);
+                       } catch (ClassNotFoundException e) {
+                               return readClassDescriptor;
+                       }
+
+                       ObjectStreamClass localClassDescriptor = 
ObjectStreamClass.lookup(localClass);
+                       if (localClassDescriptor != null) { // only if class 
implements serializable
+                               final long localSUID = 
localClassDescriptor.getSerialVersionUID();
+                               final long readSUID = 
readClassDescriptor.getSerialVersionUID();
+                               if (readSUID != localSUID) {
+                                       // Overriding serialized class version 
mismatch
+                                       readClassDescriptor = 
localClassDescriptor; // Use local class descriptor for deserialization
+                               }
+                       }
+                       return readClassDescriptor;
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
 
b/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
index 5e4ec51..9b3ddd3 100644
--- 
a/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
+++ 
b/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
@@ -15,86 +15,77 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.python.api;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.apache.flink.streaming.python.api.PythonStreamBinder;
+import org.apache.flink.util.Preconditions;
+
+import org.python.core.PyException;
 
-import java.io.File;
-import java.io.FileNotFoundException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.ListIterator;
 
+/**
+ * Tests for the {@link PythonStreamBinder}.
+ */
 public class PythonStreamBinderTest extends StreamingProgramTestBase {
-       final private static String defaultPythonScriptName = 
"run_all_tests.py";
-       final private static String flinkPythonRltvPath = 
"flink-libraries/flink-streaming-python";
-       final private static String pathToStreamingTests = 
"src/test/python/org/apache/flink/streaming/python/api";
-
-       public PythonStreamBinderTest() {
-       }
 
-       public static void main(String[] args) throws Exception {
-               if (args.length == 0) {
-                       args = prepareDefaultArgs();
-               } else {
-                       args[0] = findStreamTestFile(args[0]).getAbsolutePath();
-               }
-               PythonStreamBinder.main(args);
-       }
-
-       @Override
-       public void testProgram() throws Exception {
-               this.main(new String[]{});
+       private static Path getBaseTestPythonDir() {
+               FileSystem fs = new LocalFileSystem();
+               return new Path(fs.getWorkingDirectory(), 
"src/test/python/org/apache/flink/streaming/python/api");
        }
 
-       private static String[] prepareDefaultArgs() throws Exception {
-               File testFullPath = findStreamTestFile(defaultPythonScriptName);
-               List<String> filesInTestPath = 
getFilesInFolder(testFullPath.getParent());
-
-               String[] args = new String[filesInTestPath.size() + 1];
-               args[0] = testFullPath.getAbsolutePath();
-
-               for (final ListIterator<String> it = 
filesInTestPath.listIterator(); it.hasNext();) {
-                       final String p = it.next();
-                       args[it.previousIndex() + 1] = p;
-               }
-               return args;
+       private static Path findUtilsModule() {
+               return new Path(getBaseTestPythonDir(), "utils");
        }
 
-       private static File findStreamTestFile(String name) throws Exception {
-               if (new File(name).exists()) {
-                       return new File(name);
-               }
+       private static List<String> findTestFiles() throws Exception {
+               List<String> files = new ArrayList<>();
                FileSystem fs = FileSystem.getLocalFileSystem();
-               String workingDir = fs.getWorkingDirectory().getPath();
-               if (!workingDir.endsWith(flinkPythonRltvPath)) {
-                       workingDir += File.separator + flinkPythonRltvPath;
-               }
-               FileStatus[] status = fs.listStatus(
-                       new Path( workingDir + File.separator + 
pathToStreamingTests));
+               FileStatus[] status = fs.listStatus(getBaseTestPythonDir());
                for (FileStatus f : status) {
-                       String file_name = f.getPath().getName();
-                       if (file_name.equals(name)) {
-                               return new File(f.getPath().getPath());
+                       Path filePath = f.getPath();
+                       String fileName = filePath.getName();
+                       if (fileName.startsWith("test_") && 
fileName.endsWith(".py")) {
+                               files.add(filePath.getPath());
                        }
                }
-               throw new FileNotFoundException();
+               return files;
        }
 
-       private static List<String> getFilesInFolder(String path) {
-               List<String> results = new ArrayList<>();
-               File[] files = new File(path).listFiles();
-               if (files != null) {
-                       for (File file : files) {
-                               if (file.isDirectory() || 
file.getName().startsWith("test_")) {
-                                       results.add("." + File.separator + 
file.getName());
-                               }
+       @Override
+       public void testProgram() throws Exception {
+               Path testEntryPoint = new Path(getBaseTestPythonDir(), 
"examples/word_count.py");
+               List<String> testFiles = findTestFiles();
+
+               Preconditions.checkState(testFiles.size() > 0, "No test files 
were found in {}.", getBaseTestPythonDir());
+
+               String[] arguments = new String[1 + 1 + testFiles.size()];
+               arguments[0] = testEntryPoint.getPath();
+               arguments[1] = findUtilsModule().getPath();
+               int index = 2;
+               for (String testFile : testFiles) {
+                       arguments[index] = testFile;
+                       index++;
+               }
+               try {
+                       new PythonStreamBinder(new Configuration())
+                               .runPlan(arguments);
+               } catch (PyException e) {
+                       if (e.getCause() instanceof JobExecutionException) {
+                               // JobExecutionExceptions are wrapped again by 
the jython interpreter resulting in horrible stacktraces
+                               throw (JobExecutionException) e.getCause();
+                       } else {
+                               // probably caused by some issue in the main 
script itself
+                               throw e;
                        }
                }
-               return results;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/examples/fibonacci.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/examples/fibonacci.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/examples/fibonacci.py
new file mode 100644
index 0000000..972e62f
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/examples/fibonacci.py
@@ -0,0 +1,122 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+#
+# An example that illustrates iterations in Flink streaming. The program sums 
up random numbers and counts
+# additions. The operation is done until it reaches a specific threshold in an 
iterative streaming fashion.
+#
+
+import random
+import argparse
+
+from org.apache.flink.api.common.functions import MapFunction
+from org.apache.flink.streaming.api.collector.selector import OutputSelector
+from org.apache.flink.streaming.api.functions.source import SourceFunction
+
+TARGET_VAL = 100
+MAX_INT_START = 50
+
+
+class Generator(SourceFunction):
+    def __init__(self, num_iters=1000):
+        self._running = True
+        self._num_iters = num_iters
+
+    def run(self, ctx):
+        counter = 0
+        while self._running and counter < self._num_iters:
+            self.do(ctx)
+            counter += 1
+
+    def do(self, ctx):
+        two_numbers = "{}, {}".format(random.randrange(1, MAX_INT_START), 
random.randrange(1, MAX_INT_START))
+        ctx.collect(two_numbers)
+
+    def cancel(self):
+        self._running = False
+
+
+class Step(MapFunction):
+    def map(self, value):
+        return (value[0], value[1], value[3], value[2] + value[3], value[4] + 
1)
+
+
+class InputMap(MapFunction):
+    def map(self, value):
+        num1, num2 = value.split(",")
+
+        num1 = int(num1.strip())
+        num2 = int(num2.strip())
+
+        return (num1, num2, num1, num2, 0)
+
+
+class OutPut(MapFunction):
+    def map(self, value):
+        return ((value[0], value[1]), value[4])
+
+
+class Selector(OutputSelector):
+    def select(self, value):
+        return ["iterate"] if value[2] < TARGET_VAL and value[3] < TARGET_VAL 
else ["output"]
+
+
+class Main:
+    def run(self, flink, args):
+        env = flink.get_execution_environment()
+
+        # create input stream of integer pairs
+        if args.input:
+            input_stream = env.read_text_file(args.input)
+        else:
+            input_stream = env.create_python_source(Generator(num_iters=50))
+
+        # create an iterative data stream from the input with 5 second timeout
+        it = input_stream\
+            .map(InputMap())\
+            .iterate(5000)
+
+        # apply the step function to get the next Fibonacci number
+        # increment the counter and split the output with the output selector
+        step = it\
+            .map(Step())\
+            .split(Selector())
+
+        # close the iteration by selecting the tuples that were directed to the
+        # 'iterate' channel in the output selector
+        it.close_with(step.select("iterate"))
+
+        # to produce the final output select the tuples directed to the
+        # 'output' channel then get the input pairs that have the greatest 
iteration counter
+        # on a 1 second sliding window
+        output = step.select("output")
+        parsed_output = output.map(OutPut())
+        if args.output:
+            parsed_output.write_as_text(args.output)
+        else:
+            parsed_output.output()
+        result = env.execute("Fibonacci Example (py)")
+        print("Fibonacci job completed, job_id={}".format(result.jobID))
+
+
+def main(flink):
+    parser = argparse.ArgumentParser(description='Fibonacci.')
+    parser.add_argument('--input', metavar='IN', help='input file path')
+    parser.add_argument('--output', metavar='OUT', help='output file path')
+    args = parser.parse_args()
+    Main().run(flink, args)

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/examples/word_count.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/examples/word_count.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/examples/word_count.py
new file mode 100644
index 0000000..2254898
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/examples/word_count.py
@@ -0,0 +1,122 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import argparse
+
+from org.apache.flink.api.common.functions import FlatMapFunction
+from org.apache.flink.api.common.functions import ReduceFunction
+from org.apache.flink.api.java.functions import KeySelector
+
+TARGET_VAL = 100
+MAX_INT_START = 50
+
+default_input_data = [
+    "To be, or not to be,--that is the question:--",
+    "Whether 'tis nobler in the mind to suffer",
+    "The slings and arrows of outrageous fortune",
+    "Or to take arms against a sea of troubles,",
+    "And by opposing end them?--To die,--to sleep,--",
+    "No more; and by a sleep to say we end",
+    "The heartache, and the thousand natural shocks",
+    "That flesh is heir to,--'tis a consummation",
+    "Devoutly to be wish'd. To die,--to sleep;--",
+    "To sleep! perchance to dream:--ay, there's the rub;",
+    "For in that sleep of death what dreams may come,",
+    "When we have shuffled off this mortal coil,",
+    "Must give us pause: there's the respect",
+    "That makes calamity of so long life;",
+    "For who would bear the whips and scorns of time,",
+    "The oppressor's wrong, the proud man's contumely,",
+    "The pangs of despis'd love, the law's delay,",
+    "The insolence of office, and the spurns",
+    "That patient merit of the unworthy takes,",
+    "When he himself might his quietus make",
+    "With a bare bodkin? who would these fardels bear,",
+    "To grunt and sweat under a weary life,",
+    "But that the dread of something after death,--",
+    "The undiscover'd country, from whose bourn",
+    "No traveller returns,--puzzles the will,",
+    "And makes us rather bear those ills we have",
+    "Than fly to others that we know not of?",
+    "Thus conscience does make cowards of us all;",
+    "And thus the native hue of resolution",
+    "Is sicklied o'er with the pale cast of thought;",
+    "And enterprises of great pith and moment,",
+    "With this regard, their currents turn awry,",
+    "And lose the name of action.--Soft you now!",
+    "The fair Ophelia!--Nymph, in thy orisons",
+    "Be all my sins remember'd."
+]
+
+
+class Tokenizer(FlatMapFunction):
+    def flatMap(self, value, collector):
+        for word in value.lower().split():
+            collector.collect((1, word))
+
+
+class Sum(ReduceFunction):
+    def reduce(self, value1, value2):
+        return (value1[0] + value2[0], value1[1])       
+
+
+class Selector(KeySelector):
+    def getKey(self, input):
+        return input[1]
+
+
+class Main:
+    def run(self, flink, args):
+        env = flink.get_execution_environment()
+
+        if args.input:
+            text = env.read_text_file(args.input)
+        else:
+            text = env.from_collection(default_input_data)
+
+        counts = text \
+            .flat_map(Tokenizer())\
+            .key_by(Selector())\
+            .reduce(Sum())
+
+        if args.output:
+            counts.write_as_text(args.output)
+        else:
+            counts.output()
+        env.execute("Wordcount Example (py)")
+
+"""
+Implements the "WordCount" program that computes a simple word occurrence
+histogram over text files in a streaming fashion.
+
+The input is a plain text file with lines separated by newline characters.
+
+Usage: WordCount --input <path> --output <path>
+If no parameters are provided, the program is run with default data.
+
+This example shows how to:
+  * write a simple Flink Streaming program,
+  * use tuple data types,
+  * write and use user-defined functions.
+"""
+def main(flink):
+    parser = argparse.ArgumentParser(description='WordCount.')
+    parser.add_argument('--input', metavar='IN', help='input file path')
+    parser.add_argument('--output', metavar='OUT', help='output file path')
+    args = parser.parse_args()
+    Main().run(flink, args)

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/run_all_tests.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/run_all_tests.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/run_all_tests.py
index 1970918..b8378cc 100644
--- 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/run_all_tests.py
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/run_all_tests.py
@@ -16,10 +16,9 @@
 # limitations under the License.
 
################################################################################
 import sys
-from os.path import dirname, join, basename
+import traceback
 from glob import glob
-from org.apache.flink.runtime.client import JobExecutionException
-
+from os.path import dirname, join, basename
 
 excluded_tests = [
     'test_kafka09',
@@ -30,46 +29,30 @@ class Main:
     def __init__(self):
         pass
 
-    def run(self):
+    def run(self, flink):
         tests = []
-        pwd = dirname(sys.argv[0])
-        print("Working directory: {}".format(pwd))
+
+        current_dir = dirname(sys.modules[__name__].__file__)
+        print("Working directory: {}".format(current_dir))
 
         if excluded_tests:
             print("Excluded tests: {}\n".format(excluded_tests))
 
-        for x in glob(join(pwd, 'test_*.py')):
+        for x in glob(join(current_dir, 'test_*.py')):
             if not x.startswith('__'):
                 test_module_name = basename(x)[:-3]
                 if test_module_name not in excluded_tests:
                     tests.append(__import__(test_module_name, globals(), 
locals()))
 
-        failed_tests = []
-        for test in tests:
-            print("Submitting job ... '{}'".format(test.__name__))
-            try:
-                test.main()
+        try:
+            for test in tests:
+                print("Submitting job ... '{}'".format(test.__name__))
+                test.main(flink)
                 print("Job completed ('{}')\n".format(test.__name__))
-            except JobExecutionException as ex:
-                failed_tests.append(test.__name__)
-                print("\n{}\n{}\n{}\n".format('#'*len(ex.message), ex.message, 
'#'*len(ex.message)))
-            except:
-                failed_tests.append(test.__name__)
-                ex_type = sys.exc_info()[0]
-                print("\n{}\n{}\n{}\n".format('#'*len(ex_type), ex_type, 
'#'*len(ex_type)))
-
-        if failed_tests:
-            print("\nThe following tests were failed:")
-            for failed_test in failed_tests:
-                print("\t* " + failed_test)
-            raise Exception("\nFailed test(s): {}".format(failed_tests))
-        else:
-            print("\n*** All tests passed successfully ***")
-
-
-def main():
-    Main().run()
+        except Exception as ex:
+            print ("Test {} has failed\n".format(test.__name__))
+            traceback.print_exc()
 
 
-if __name__ == "__main__":
-    main()
+def main(flink):
+    Main().run(flink)

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_filter.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_filter.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_filter.py
index c32124e..6f25deb 100644
--- 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_filter.py
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_filter.py
@@ -15,14 +15,12 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-import sys
 from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction, FilterFunction
 from org.apache.flink.api.java.functions import KeySelector
 from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
 
 from utils import constants
 from utils.pygeneratorbase import PyGeneratorBase
-from utils.python_test_base import TestBase
 
 
 class Generator(PyGeneratorBase):
@@ -58,25 +56,19 @@ class Selector(KeySelector):
         return input[1]
 
 
-class Main(TestBase):
-    def __init__(self):
-        super(Main, self).__init__()
-
-    def run(self):
-        env = self._get_execution_environment()
+class Main:
+    def run(self, flink):
+        env = flink.get_execution_environment()
         
env.create_python_source(Generator(num_iters=constants.NUM_ITERATIONS_IN_TEST)) 
\
             .filter(Filterer()) \
             .flat_map(Tokenizer()) \
             .key_by(Selector()) \
             .time_window(milliseconds(100)) \
             .reduce(Sum()) \
-            .print()
+            .output()
 
-        env.execute(True)
+        env.execute()
 
-def main():
-    Main().run()
+def main(flink):
+    Main().run(flink)
 
-if __name__ == '__main__':
-    main()
-    print("Job completed ({})\n".format(sys.argv))

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_int.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_int.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_int.py
index 68ac3ef..e8ef68a 100644
--- 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_int.py
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_int.py
@@ -15,9 +15,8 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-import sys
 from utils import constants
-from utils.python_test_base import TestBase
+
 from utils.pygeneratorbase import PyGeneratorBase
 from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
 from org.apache.flink.api.java.functions import KeySelector
@@ -50,28 +49,19 @@ class Selector(KeySelector):
         return input[1]
 
 
-class Main(TestBase):
-    def __init__(self):
-        super(Main, self).__init__()
-
-    def run(self):
-        env = self._get_execution_environment()
+class Main:
+    def run(self, flink):
+        env = flink.get_execution_environment()
         env.enable_checkpointing(1000, CheckpointingMode.AT_LEAST_ONCE) \
             
.create_python_source(Generator(num_iters=constants.NUM_ITERATIONS_IN_TEST)) \
             .flat_map(Tokenizer()) \
             .key_by(Selector()) \
             .time_window(seconds(1)) \
             .reduce(Sum()) \
-            .print()
-
-        result = env.execute("MyJob", True)
-        print("Job completed, job_id={}".format(str(result.jobID)))
-
+            .output()
 
-def main():
-    Main().run()
+        env.execute()
 
 
-if __name__ == '__main__':
-    main()
-    print("Job completed ({})\n".format(sys.argv))
+def main(flink):
+    Main().run(flink)

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_list_int.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_list_int.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_list_int.py
index 6f6db53..a550985 100644
--- 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_list_int.py
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_list_int.py
@@ -15,14 +15,12 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-import sys
 from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
 from org.apache.flink.api.java.functions import KeySelector
 from org.apache.flink.streaming.api.windowing.time.Time import seconds
 
 from utils import constants
 from utils.pygeneratorbase import PyGeneratorBase
-from utils.python_test_base import TestBase
 
 
 class Generator(PyGeneratorBase):
@@ -51,26 +49,18 @@ class Selector(KeySelector):
         return input[1]
 
 
-class Main(TestBase):
-    def __init__(self):
-        super(Main, self).__init__()
-
-    def run(self):
-        env = self._get_execution_environment()
+class Main:
+    def run(self, flink):
+        env = flink.get_execution_environment()
         
env.create_python_source(Generator(num_iters=constants.NUM_ITERATIONS_IN_TEST)) 
\
             .flat_map(Tokenizer()) \
             .key_by(Selector()) \
             .time_window(seconds(1)) \
             .reduce(Sum()) \
-            .print()
-
-        env.execute(True)
-
+            .output()
 
-def main():
-    Main().run()
+        env.execute()
 
 
-if __name__ == '__main__':
-    main()
-    print("Job completed ({})\n".format(sys.argv))
+def main(flink):
+    Main().run(flink)

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_string.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_string.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_string.py
index 9f5e7e5..e7e2a77 100644
--- 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_string.py
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_flatmap_string.py
@@ -15,14 +15,12 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-import sys
 from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
 from org.apache.flink.api.java.functions import KeySelector
 from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
 
 from utils import constants
 from utils.pygeneratorbase import PyGeneratorBase
-from utils.python_test_base import TestBase
 
 
 class Generator(PyGeneratorBase):
@@ -51,26 +49,18 @@ class Sum(ReduceFunction):
         return (count1 + count2, word1)
 
 
-class Main(TestBase):
-    def __init__(self):
-        super(Main, self).__init__()
-
-    def run(self):
-        env = self._get_execution_environment()
+class Main:
+    def run(self, flink):
+        env = flink.get_execution_environment()
         
env.create_python_source(Generator(num_iters=constants.NUM_ITERATIONS_IN_TEST)) 
\
             .flat_map(Tokenizer()) \
             .key_by(Selector()) \
             .time_window(milliseconds(50)) \
             .reduce(Sum()) \
-            .print()
-
-        env.execute(True)
-
+            .output()
 
-def main():
-    Main().run()
+        env.execute()
 
 
-if __name__ == '__main__':
-    main()
-    print("Job completed ({})\n".format(sys.argv))
+def main(flink):
+    Main().run(flink)

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_collection.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_collection.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_collection.py
index 67b1a42..82cfae1 100644
--- 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_collection.py
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_collection.py
@@ -20,7 +20,6 @@ from org.apache.flink.api.java.functions import KeySelector
 from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
 
 from utils import constants
-from utils.python_test_base import TestBase
 
 
 class Tokenizer(FlatMapFunction):
@@ -40,28 +39,20 @@ class Selector(KeySelector):
         return input[1]
 
 
-class Main(TestBase):
-    def __init__(self):
-        super(Main, self).__init__()
-
-    def run(self):
+class Main:
+    def run(self, flink):
         elements = ["aa" if iii % 2 == 0 else "bbb" for iii in 
range(constants.NUM_ELEMENTS_IN_TEST)]
 
-        env = self._get_execution_environment()
+        env = flink.get_execution_environment()
         env.from_collection(elements) \
             .flat_map(Tokenizer()) \
             .key_by(Selector()) \
             .time_window(milliseconds(10)) \
             .reduce(Sum()) \
-            .print()
-
-        result = env.execute("MyJob", True)
-        print("Job completed, job_id={}".format(result.jobID))
-
+            .output()
 
-def main():
-    Main().run()
+        env.execute()
 
 
-if __name__ == '__main__':
-    main()
+def main(flink):
+    Main().run(flink)

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_elements.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_elements.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_elements.py
index 59f8cf7..d4ebc2a 100644
--- 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_elements.py
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_elements.py
@@ -20,7 +20,6 @@ from org.apache.flink.api.java.functions import KeySelector
 from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
 
 from utils import constants
-from utils.python_test_base import TestBase
 
 
 class Tokenizer(FlatMapFunction):
@@ -40,28 +39,20 @@ class Selector(KeySelector):
         return input[1][0]
 
 
-class Main(TestBase):
-    def __init__(self):
-        super(Main, self).__init__()
-
-    def run(self):
+class Main:
+    def run(self, flink):
         elements = [("Alice", 111) if iii % 2 == 0 else ("Bob", 2222) for iii 
in range(constants.NUM_ELEMENTS_IN_TEST)]
 
-        env = self._get_execution_environment()
+        env = flink.get_execution_environment()
         env.from_elements(*elements) \
             .flat_map(Tokenizer()) \
             .key_by(Selector()) \
             .time_window(milliseconds(10)) \
             .reduce(Sum()) \
-            .print()
-
-        result = env.execute("MyJob", True)
-        print("Job completed, job_id={}".format(result.jobID))
-
+            .output()
 
-def main():
-    Main().run()
+        env.execute()
 
 
-if __name__ == '__main__':
-    main()
+def main(flink):
+    Main().run(flink)

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_iterator.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_iterator.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_iterator.py
index 9e615d7..328efc8 100644
--- 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_iterator.py
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_from_iterator.py
@@ -22,7 +22,6 @@ from org.apache.flink.streaming.python.util import 
PythonIterator
 from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
 
 from utils import constants
-from utils.python_test_base import TestBase
 
 
 class SomeIterator(PythonIterator):
@@ -59,26 +58,18 @@ class Selector(KeySelector):
         return input[1]
 
 
-class Main(TestBase):
-    def __init__(self):
-        super(Main, self).__init__()
-
-    def run(self):
-        env = self._get_execution_environment()
+class Main:
+    def run(self, flink):
+        env = flink.get_execution_environment()
         env.from_collection(SomeIterator(constants.NUM_ITERATIONS_IN_TEST)) \
             .flat_map(Tokenizer()) \
             .key_by(Selector()) \
             .time_window(milliseconds(10)) \
             .reduce(Sum()) \
-            .print()
-
-        result = env.execute("MyJob", True)
-        print("Job completed, job_id={}".format(result.jobID))
-
+            .output()
 
-def main():
-    Main().run()
+        env.execute()
 
 
-if __name__ == '__main__':
-    main()
+def main(flink):
+    Main().run(flink)

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_generate_sequence.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_generate_sequence.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_generate_sequence.py
index 2e24464..3edc7b8 100644
--- 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_generate_sequence.py
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_generate_sequence.py
@@ -20,7 +20,6 @@ from org.apache.flink.api.java.functions import KeySelector
 from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
 
 from utils import constants
-from utils.python_test_base import TestBase
 
 
 class Tokenizer(FlatMapFunction):
@@ -40,26 +39,18 @@ class Selector(KeySelector):
         return 1
 
 
-class Main(TestBase):
-    def __init__(self):
-        super(Main, self).__init__()
-
-    def run(self):
-        env = self._get_execution_environment()
+class Main:
+    def run(self, flink):
+        env = flink.get_execution_environment()
         env.generate_sequence(1, constants.NUM_ITERATIONS_IN_TEST) \
             .flat_map(Tokenizer()) \
             .key_by(Selector()) \
             .time_window(milliseconds(10)) \
             .reduce(Sum()) \
-            .print()
-
-        result = env.execute("MyJob", True)
-        print("Job completed, job_id={}".format(result.jobID))
-
+            .output()
 
-def main():
-    Main().run()
+        env.execute()
 
 
-if __name__ == '__main__':
-    main()
+def main(flink):
+    Main().run(flink)

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_iterations.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_iterations.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_iterations.py
index eea8d5c..6756508 100644
--- 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_iterations.py
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_iterations.py
@@ -19,7 +19,6 @@ from org.apache.flink.api.common.functions import 
FilterFunction
 from org.apache.flink.api.common.functions import MapFunction
 
 from utils import constants
-from utils.python_test_base import TestBase
 
 
 class MinusOne(MapFunction):
@@ -37,12 +36,9 @@ class LessEquelToZero(FilterFunction):
         return value <= 0
 
 
-class Main(TestBase):
-    def __init__(self):
-        super(Main, self).__init__()
-
-    def run(self):
-        env = self._get_execution_environment()
+class Main:
+    def run(self, flink):
+        env = flink.get_execution_environment()
         some_integers = env.from_collection([2] * 5)
 
         iterative_stream = 
some_integers.iterate(constants.MAX_EXECUTION_TIME_MS)
@@ -55,15 +51,10 @@ class Main(TestBase):
 
         less_then_zero = minus_one_stream.filter(LessEquelToZero())
 
-        less_then_zero.print()
-
-        result = env.execute("MyJob", True)
-        print("Job completed, job_id={}".format(result.jobID))
-
+        less_then_zero.output()
 
-def main():
-    Main().run()
+        env.execute()
 
 
-if __name__ == '__main__':
-    main()
+def main(flink):
+    Main().run(flink)

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_kafka09.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_kafka09.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_kafka09.py
deleted file mode 100644
index 1be7c17..0000000
--- 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_kafka09.py
+++ /dev/null
@@ -1,157 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-import sys
-import threading
-from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
-from org.apache.flink.api.java.functions import KeySelector
-from org.apache.flink.api.java.utils import ParameterTool
-from org.apache.flink.streaming.python.connectors import 
PythonFlinkKafkaProducer09, PythonFlinkKafkaConsumer09
-from org.apache.flink.streaming.api.functions.source import SourceFunction
-from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
-from org.apache.flink.streaming.util.serialization import DeserializationSchema
-from org.apache.flink.streaming.util.serialization import SerializationSchema
-
-from utils import constants
-from utils import utils
-from utils.python_test_base import TestBase
-
-KAFKA_DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092"
-
-
-class StringGenerator(SourceFunction):
-    def __init__(self, msg, end_msg, num_iters=7000):
-        self._running = True
-        self._msg = msg
-        self._end_msg = end_msg
-        self._num_iters = num_iters
-    def run(self, ctx):
-        counter = 0
-        while self._running and counter < self._num_iters - 1:
-            counter += 1
-            ctx.collect(self._msg)
-        ctx.collect(self._end_msg)
-    def cancel(self):
-        self._running = False
-
-
-class ToStringSchema(SerializationSchema):
-    def serialize(self, value):
-        return str(value)
-
-
-class KafkaStringProducer(threading.Thread, TestBase):
-    def __init__(self, bootstrap_server, msg, end_msg, num_iters):
-        threading.Thread.__init__(self)
-        self._bootstrap_server = bootstrap_server
-        self._msg = msg
-        self._end_msg = end_msg
-        # if self._msg[-1] != '\n': self._msg += '\n'
-        # if self._end_msg[-1] != '\n': self._end_msg += '\n'
-        self._num_iters = num_iters
-
-    def run(self):
-        env = self._get_execution_environment()
-
-        stream = env.create_python_source(StringGenerator(self._msg, 
self._end_msg, num_iters=100))
-
-        producer = PythonFlinkKafkaProducer09(KAFKA_DEFAULT_BOOTSTRAP_SERVERS, 
"kafka09-test", ToStringSchema())
-        producer.set_log_failures_only(False);   # "False" by default
-        producer.set_flush_on_checkpoint(True);  # "True" by default
-
-        stream.add_sink(producer)
-
-        result = env.execute("Kafka09 producer test")
-        print("Kafka09 producer job completed, job_id={}".format(result.jobID))
-
-
-class StringDeserializationSchema(DeserializationSchema):
-    def deserialize(self, message):
-        return ''.join(map(chr,message))
-
-    def isEndOfStream(self, element):
-        return str(element) == "quit"
-
-
-class Tokenizer(FlatMapFunction):
-    def flatMap(self, value, collector):
-        for word in value.lower().split():
-            collector.collect((1, word))
-
-
-class Sum(ReduceFunction):
-    def reduce(self, input1, input2):
-        count1, val1 = input1
-        count2, val2 = input2
-        return (count1 + count2, val1)
-
-
-class Selector(KeySelector):
-    def getKey(self, input):
-        return input[1]
-
-
-class KafkaStringConsumer(threading.Thread, TestBase):
-    def __init__(self, bootstrap_server):
-        threading.Thread.__init__(self)
-        self._bootstrap_server = bootstrap_server
-
-    def run(self):
-        parameterTool = ParameterTool.fromArgs(sys.argv[1:])
-        props = parameterTool.getProperties()
-        props.setProperty("bootstrap.servers", self._bootstrap_server)
-
-        consumer = PythonFlinkKafkaConsumer09("kafka09-test", 
StringDeserializationSchema(), props)
-
-        env = self._get_execution_environment()
-        env.add_java_source(consumer) \
-            .flat_map(Tokenizer()) \
-            .key_by(Selector()) \
-            .time_window(milliseconds(100)) \
-            .reduce(Sum()) \
-            .print()
-
-        result = env.execute("Python consumer kafka09 test", True)
-        print("Kafka09 consumer job completed, job_id={}".format(result.jobID))
-
-
-class Main(TestBase):
-    def __init__(self):
-        super(Main, self).__init__()
-
-    def run(self):
-        host, port = KAFKA_DEFAULT_BOOTSTRAP_SERVERS.split(":")
-        if not utils.is_reachable(host, int(port)):
-            print("Kafka server is not reachable: 
[{}]".format(KAFKA_DEFAULT_BOOTSTRAP_SERVERS))
-            return
-
-        kafka_p = KafkaStringProducer(KAFKA_DEFAULT_BOOTSTRAP_SERVERS, "Hello 
World", "quit", constants.NUM_ITERATIONS_IN_TEST)
-        kafka_c = KafkaStringConsumer(KAFKA_DEFAULT_BOOTSTRAP_SERVERS)
-
-        kafka_p.start()
-        kafka_c.start()
-
-        kafka_p.join()
-        kafka_c.join()
-
-
-def main():
-    Main().run()
-
-
-if __name__ == '__main__':
-    main()

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_keyed_stream_reduce.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_keyed_stream_reduce.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_keyed_stream_reduce.py
index 07a60c0..fd5f25d 100644
--- 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_keyed_stream_reduce.py
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_keyed_stream_reduce.py
@@ -15,12 +15,10 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-import sys
 from org.apache.flink.api.common.functions import ReduceFunction, 
FlatMapFunction
 from org.apache.flink.api.java.functions import KeySelector
 
 from utils import constants
-from utils.python_test_base import TestBase
 
 
 class Tokenizer(FlatMapFunction):
@@ -41,29 +39,20 @@ class Sum(ReduceFunction):
         return (count1 + count2, val1)
 
 
-class Main(TestBase):
-    def __init__(self):
-        super(Main, self).__init__()
-
-    def run(self):
+class Main:
+    def run(self, flink):
         elements = [(1, 222 if x % 2 == 0 else 333) for x in 
range(constants.NUM_ELEMENTS_IN_TEST)]
 
-        env = self._get_execution_environment()
+        env = flink.get_execution_environment()
         env.set_parallelism(2) \
             .from_elements(elements) \
             .flat_map(Tokenizer()) \
             .key_by(Selector()) \
             .reduce(Sum()) \
-            .print()
-
-        result = env.execute("MyJob", True)
-        print("Job completed, job_id={}".format(str(result.jobID)))
-
+            .output()
 
-def main():
-    Main().run()
+        result = env.execute()
 
 
-if __name__ == '__main__':
-    main()
-    print("Job completed ({})\n".format(sys.argv))
+def main(flink):
+    Main().run(flink)

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map.py
index 43984d0..1e97d05 100644
--- 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map.py
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map.py
@@ -15,8 +15,6 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-import sys
-from utils.python_test_base import TestBase
 from utils.pygeneratorbase import PyGeneratorBase
 from org.apache.flink.api.common.functions import MapFunction, 
FlatMapFunction, ReduceFunction
 from org.apache.flink.api.java.functions import KeySelector
@@ -55,12 +53,9 @@ class Selector(KeySelector):
         return input[1]
 
 
-class Main(TestBase):
-    def __init__(self):
-        super(Main, self).__init__()
-
-    def run(self):
-        env = self._get_execution_environment()
+class Main:
+    def run(self, flink):
+        env = flink.get_execution_environment()
         env.from_collection([3] * 5) \
             .map(DummyTupple()) \
             .map(MinusOne()) \
@@ -68,16 +63,10 @@ class Main(TestBase):
             .key_by(Selector()) \
             .time_window(milliseconds(5)) \
             .reduce(Sum()) \
-            .print()
-
-        result = env.execute("MyJob", True)
-        print("Job completed, job_id={}".format(str(result.jobID)))
-
+            .output()
 
-def main():
-    Main().run()
+        env.execute()
 
 
-if __name__ == '__main__':
-    main()
-    print("Job completed ({})\n".format(sys.argv))
+def main(flink):
+    Main().run(flink)

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map_int.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map_int.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map_int.py
index 44e7bfc..c2b0b5b 100644
--- 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map_int.py
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_map_int.py
@@ -15,14 +15,12 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-import sys
 from org.apache.flink.api.common.functions import MapFunction, ReduceFunction
 from org.apache.flink.api.java.functions import KeySelector
 from org.apache.flink.streaming.api.windowing.time.Time import seconds
 
 from utils import constants
 from utils.pygeneratorbase import PyGeneratorBase
-from utils.python_test_base import TestBase
 
 
 class Generator(PyGeneratorBase):
@@ -52,26 +50,18 @@ class Sum(ReduceFunction):
         return (count1 + count2, val1)
 
 
-class Main(TestBase):
-    def __init__(self):
-        super(Main, self).__init__()
-
-    def run(self):
-        env = self._get_execution_environment()
+class Main:
+    def run(self, flink):
+        env = flink.get_execution_environment()
         
env.create_python_source(Generator(num_iters=constants.NUM_ITERATIONS_IN_TEST)) 
\
             .map(Tokenizer()) \
             .key_by(Selector()) \
             .time_window(seconds(1)) \
             .reduce(Sum()) \
-            .print()
-
-        env.execute(True)
-
+            .output()
 
-def main():
-    Main().run()
+        env.execute()
 
 
-if __name__ == '__main__':
-    main()
-    print("Job completed ({})\n".format(sys.argv))
+def main(flink):
+    Main().run(flink)

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_read_text_file.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_read_text_file.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_read_text_file.py
index 36c86c9..76d6c38 100644
--- 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_read_text_file.py
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_read_text_file.py
@@ -17,14 +17,12 @@
 
################################################################################
 import os
 import re
-import sys
 import uuid
 from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
 from org.apache.flink.api.java.functions import KeySelector
 from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
 
 from utils import constants
-from utils.python_test_base import TestBase
 
 
 class Tokenizer(FlatMapFunction):
@@ -52,31 +50,23 @@ def generate_tmp_text_file(num_lines=100):
     return tmp_f
 
 
-class Main(TestBase):
-    def __init__(self):
-        super(Main, self).__init__()
-
-    def run(self):
+class Main:
+    def run(self, flink):
         tmp_f = generate_tmp_text_file(constants.NUM_ELEMENTS_IN_TEST)
         try:
-            env = self._get_execution_environment()
+            env = flink.get_execution_environment()
             env.read_text_file(tmp_f.name) \
                 .flat_map(Tokenizer()) \
                 .key_by(Selector()) \
                 .time_window(milliseconds(100)) \
                 .reduce(Sum()) \
-                .print()
+                .output()
 
-            env.execute(True)
+            env.execute()
         finally:
             tmp_f.close()
             os.unlink(tmp_f.name)
 
 
-def main():
-    Main().run()
-
-
-if __name__ == '__main__':
-    main()
-    print("Job completed ({})\n".format(sys.argv))
+def main(flink):
+    Main().run(flink)

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_socket_text_stream.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_socket_text_stream.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_socket_text_stream.py
index 726d69f..54a1be4 100644
--- 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_socket_text_stream.py
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_socket_text_stream.py
@@ -16,16 +16,13 @@
 # limitations under the License.
 
################################################################################
 import socket
-import sys
 import threading
 import time
 from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
 from org.apache.flink.api.java.functions import KeySelector
 from org.apache.flink.streaming.api.windowing.time.Time import seconds
 
-from utils import constants
-from utils import utils
-from utils.python_test_base import TestBase
+from utils import constants, utils
 
 
 class SocketStringGenerator(threading.Thread):
@@ -66,30 +63,22 @@ class Selector(KeySelector):
     def getKey(self, input):
         return input[1]
 
-class Main(TestBase):
-    def __init__(self):
-        super(Main, self).__init__()
-
-    def run(self):
+class Main:
+    def run(self, flink):
         f_port = utils.gen_free_port()
         SocketStringGenerator(host='', port=f_port, msg='Hello World', 
num_iters=constants.NUM_ITERATIONS_IN_TEST).start()
         time.sleep(0.5)
 
-        env = self._get_execution_environment()
+        env = flink.get_execution_environment()
         env.socket_text_stream('localhost', f_port) \
             .flat_map(Tokenizer()) \
             .key_by(Selector()) \
             .time_window(seconds(1)) \
             .reduce(Sum()) \
-            .print()
-
-        env.execute(True)
-
+            .output()
 
-def main():
-    Main().run()
+        env.execute()
 
 
-if __name__ == '__main__':
-    main()
-    print("Job completed ({})\n".format(sys.argv))
+def main(flink):
+    Main().run(flink)

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_split_select.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_split_select.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_split_select.py
index 9052652..7461c72 100644
--- 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_split_select.py
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_split_select.py
@@ -21,7 +21,6 @@ from org.apache.flink.streaming.api.collector.selector import 
OutputSelector
 from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
 
 from utils import constants
-from utils.python_test_base import TestBase
 
 
 class StreamSelector(OutputSelector):
@@ -45,12 +44,9 @@ class Selector(KeySelector):
     def getKey(self, input):
         return 1
 
-class Main(TestBase):
-    def __init__(self):
-        super(Main, self).__init__()
-
-    def run(self):
-        env = self._get_execution_environment()
+class Main:
+    def run(self, flink):
+        env = flink.get_execution_environment()
 
         split_window = env.generate_sequence(1, 
constants.NUM_ITERATIONS_IN_TEST).split(StreamSelector())
 
@@ -59,22 +55,17 @@ class Main(TestBase):
             .key_by(Selector()) \
             .time_window(milliseconds(10)) \
             .reduce(Sum()) \
-            .print()
+            .output()
 
         split_window.select('upper_stream') \
             .flat_map(Tokenizer()) \
             .key_by(Selector()) \
             .time_window(milliseconds(10)) \
             .reduce(Sum()) \
-            .print()
-
-        result = env.execute("MyJob", True)
-        print("Job completed, job_id={}".format(result.jobID))
-
+            .output()
 
-def main():
-    Main().run()
+        env.execute()
 
 
-if __name__ == '__main__':
-    main()
+def main(flink):
+    Main().run(flink)

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_union.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_union.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_union.py
index d70f87c..e2ed671 100644
--- 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_union.py
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_union.py
@@ -21,7 +21,6 @@ from org.apache.flink.streaming.api.windowing.time.Time 
import milliseconds
 
 from utils import constants
 from utils.pygeneratorbase import PyGeneratorBase
-from utils.python_test_base import TestBase
 
 
 class Generator(PyGeneratorBase):
@@ -48,12 +47,9 @@ class Selector(KeySelector):
     def getKey(self, input):
         return 1
 
-class Main(TestBase):
-    def __init__(self):
-        super(Main, self).__init__()
-
-    def run(self):
-        env = self._get_execution_environment()
+class Main:
+    def run(self, flink):
+        env = flink.get_execution_environment()
         seq1 = env.create_python_source(Generator(msg='Hello', 
num_iters=constants.NUM_ITERATIONS_IN_TEST))
         seq2 = env.create_python_source(Generator(msg='World', 
num_iters=constants.NUM_ITERATIONS_IN_TEST))
         seq3 = env.create_python_source(Generator(msg='Happy', 
num_iters=constants.NUM_ITERATIONS_IN_TEST))
@@ -63,15 +59,10 @@ class Main(TestBase):
             .key_by(Selector()) \
             .time_window(milliseconds(10)) \
             .reduce(Sum()) \
-            .print()
-
-        result = env.execute("My python union stream test", True)
-        print("Job completed, job_id={}".format(result.jobID))
-
+            .output()
 
-def main():
-    Main().run()
+        env.execute()
 
 
-if __name__ == '__main__':
-    main()
+def main(flink):
+    Main().run(flink)

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_user_type.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_user_type.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_user_type.py
index 3ace000..1d409b5 100644
--- 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_user_type.py
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_user_type.py
@@ -15,14 +15,12 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-import sys
 from org.apache.flink.api.common.functions import MapFunction, ReduceFunction
 from org.apache.flink.api.java.functions import KeySelector
 from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
 
 from utils import constants
 from utils.pygeneratorbase import PyGeneratorBase
-from utils.python_test_base import TestBase
 
 
 class Person:
@@ -61,26 +59,18 @@ class Sum(ReduceFunction):
         return (count1 + count2, val1)
 
 
-class Main(TestBase):
-    def __init__(self):
-        super(Main, self).__init__()
-
-    def run(self):
-        env = self._get_execution_environment()
+class Main:
+    def run(self, flink):
+        env = flink.get_execution_environment()
         
env.create_python_source(Generator(num_iters=constants.NUM_ITERATIONS_IN_TEST)) 
\
             .map(Tokenizer()) \
             .key_by(Selector()) \
             .time_window(milliseconds(30)) \
             .reduce(Sum()) \
-            .print()
-
-        env.execute(True)
-
+            .output()
 
-def main():
-    Main().run()
+        env.execute()
 
 
-if __name__ == '__main__':
-    main()
-    print("Job completed ({})\n".format(sys.argv))
+def main(flink):
+    Main().run(flink)

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_window_apply.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_window_apply.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_window_apply.py
index 8a651df..98e22c5 100644
--- 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_window_apply.py
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_window_apply.py
@@ -15,14 +15,12 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-import sys
 from org.apache.flink.api.java.functions import KeySelector
 from org.apache.flink.streaming.api.functions.windowing import WindowFunction
 from org.apache.flink.streaming.api.windowing.time.Time import seconds
 
 from utils import constants
 from utils.pygeneratorbase import PyGeneratorBase
-from utils.python_test_base import TestBase
 
 
 class Generator(PyGeneratorBase):
@@ -48,25 +46,17 @@ class WindowSum(WindowFunction):
         collector.collect((key, len(values)))
 
 
-class Main(TestBase):
-    def __init__(self):
-        super(Main, self).__init__()
-
-    def run(self):
-        env = self._get_execution_environment()
+class Main:
+    def run(self, flink):
+        env = flink.get_execution_environment()
         
env.create_python_source(Generator(num_iters=constants.NUM_ITERATIONS_IN_TEST)) 
\
             .key_by(Selector()) \
             .time_window(seconds(1)) \
             .apply(WindowSum()) \
-            .print()
-
-        env.execute(True)
-
+            .output()
 
-def main():
-    Main().run()
+        env.execute()
 
 
-if __name__ == '__main__':
-    main()
-    print("Job completed ({})\n".format(sys.argv))
+def main(flink):
+    Main().run(flink)

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_word_count.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_word_count.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_word_count.py
index baad7b3..82ace05 100644
--- 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_word_count.py
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_word_count.py
@@ -15,10 +15,8 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-import sys
 from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
 from org.apache.flink.api.java.functions import KeySelector
-from org.apache.flink.streaming.python.api.environment import 
PythonStreamExecutionEnvironment
 from org.apache.flink.streaming.api.functions.source import SourceFunction
 from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
 
@@ -57,28 +55,17 @@ class Sum(ReduceFunction):
 
 
 class Main:
-    def __init__(self, local):
-        self._local = local
-
-    def run(self):
-        env = PythonStreamExecutionEnvironment.get_execution_environment()
+    def run(self, flink):
+        env = flink.get_execution_environment()
         env.create_python_source(Generator(num_iters=100)) \
             .flat_map(Tokenizer()) \
             .key_by(Selector()) \
             .time_window(milliseconds(30)) \
             .reduce(Sum()) \
-            .print()
-
-
-        print("Execution mode: {}".format("LOCAL" if self._local else 
"REMOTE"))
-        env.execute(self._local)
-
+            .output()
 
-def main():
-    local = False if len(sys.argv) > 1 and sys.argv[1] == "remote" else True
-    Main(local).run()
+        env.execute()
 
 
-if __name__ == '__main__':
-    main()
-    print("Job completed ({})\n".format(sys.argv))
+def main(flink):
+    Main().run(flink)

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_as_text.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_as_text.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_as_text.py
index 9a34d06..1694c05 100644
--- 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_as_text.py
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_as_text.py
@@ -21,7 +21,6 @@ from org.apache.flink.core.fs.FileSystem import WriteMode
 from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
 
 from utils import constants
-from utils.python_test_base import TestBase
 
 
 class Tokenizer(FlatMapFunction):
@@ -41,14 +40,11 @@ class Selector(KeySelector):
         return input[1]
 
 
-class Main(TestBase):
-    def __init__(self):
-        super(Main, self).__init__()
-
-    def run(self):
+class Main:
+    def run(self, flink):
         elements = ["aa" if iii % 2 == 0 else "bbb" for iii in 
range(constants.NUM_ITERATIONS_IN_TEST)]
 
-        env = self._get_execution_environment()
+        env = flink.get_execution_environment()
         env.from_collection(elements) \
             .flat_map(Tokenizer()) \
             .key_by(Selector()) \
@@ -56,13 +52,8 @@ class Main(TestBase):
             .reduce(Sum()) \
             .write_as_text("/tmp/flink_write_as_text", WriteMode.OVERWRITE)
 
-        result = env.execute("MyJob", True)
-        print("Job completed, job_id={}".format(result.jobID))
-
-
-def main():
-    Main().run()
+        env.execute()
 
 
-if __name__ == '__main__':
-    main()
+def main(flink):
+    Main().run(flink)

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_to_socket.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_to_socket.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_to_socket.py
index aa322ce..9e8af9d 100644
--- 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_to_socket.py
+++ 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_write_to_socket.py
@@ -25,7 +25,6 @@ from org.apache.flink.streaming.util.serialization import 
SerializationSchema
 
 from utils import constants
 from utils import utils
-from utils.python_test_base import TestBase
 
 
 class Tokenizer(FlatMapFunction):
@@ -74,19 +73,15 @@ class SocketStringReader(threading.Thread):
         serversocket.close()
 
 
-class Main(TestBase):
-    def __init__(self):
-        super(Main, self).__init__()
-
-
-    def run(self):
+class Main:
+    def run(self, flink):
         port = utils.gen_free_port()
         SocketStringReader('', port, constants.NUM_ITERATIONS_IN_TEST).start()
         time.sleep(0.5)
 
         elements = ["aa" if iii % 2 == 0 else "bbb" for iii in 
range(constants.NUM_ITERATIONS_IN_TEST)]
 
-        env = self._get_execution_environment()
+        env = flink.get_execution_environment()
         env.from_collection(elements) \
             .flat_map(Tokenizer()) \
             .key_by(Selector()) \
@@ -94,13 +89,8 @@ class Main(TestBase):
             .reduce(Sum()) \
             .write_to_socket('localhost', port, ToStringSchema())
 
-        result = env.execute("MyJob", True)
-        print("Job completed, job_id={}".format(result.jobID))
-
-
-def main():
-    Main().run()
+        env.execute()
 
 
-if __name__ == '__main__':
-    main()
+def main(flink):
+    Main().run(flink)

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/python_test_base.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/python_test_base.py
 
b/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/python_test_base.py
deleted file mode 100644
index 7baef33..0000000
--- 
a/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/python_test_base.py
+++ /dev/null
@@ -1,35 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-import sys
-from org.apache.flink.api.java.utils import ParameterTool
-from org.apache.flink.streaming.python.api.environment import 
PythonStreamExecutionEnvironment
-
-
-class TestBase(object):
-    _params = ParameterTool.fromArgs(sys.argv[1:]) if len(sys.argv[1:]) > 0 
else None
-
-    def __init__(self):
-        pass
-
-    def _get_execution_environment(self):
-        if TestBase._params:
-            print("Create local execution environment with provided 
configurations")
-            return 
PythonStreamExecutionEnvironment.create_local_execution_environment(TestBase._params.getConfiguration())
-        else:
-            print("Get execution environment")
-            return PythonStreamExecutionEnvironment.get_execution_environment()

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/resources/log4j-test.properties
 
b/flink-libraries/flink-streaming-python/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2226f68
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

Reply via email to