http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
new file mode 100644
index 0000000..6710bf1
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.environment;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
+import org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
+import org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.core.PyInteger;
+import org.python.core.PyLong;
+import org.python.core.PyUnicode;
+import org.python.core.PyTuple;
+import org.python.core.PyObjectDerived;
+import org.python.core.PyInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+
+/**
+ * A thin wrapper layer over {@link StreamExecutionEnvironment}.
+ *
+ * <p>The PythonStreamExecutionEnvironment is the context in which a streaming 
program is executed.
+ * </p>
+ *
+ * <p>The environment provides methods to control the job execution (such as 
setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with the 
outside world
+ * (data access).</p>
+ */
+@PublicEvolving
+public class PythonStreamExecutionEnvironment {
+       private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
+       private final StreamExecutionEnvironment env;
+
+       /**
+        * A thin wrapper layer over {@link 
StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes
+        * care for required Jython serializers registration.
+        *
+        * @return The python execution environment of the context in which the 
program is
+        * executed.
+        */
+       public static PythonStreamExecutionEnvironment 
get_execution_environment() {
+               return new 
PythonStreamExecutionEnvironment(StreamExecutionEnvironment.getExecutionEnvironment());
+       }
+
+       /**
+        * Creates a {@link LocalStreamEnvironment}. The local execution 
environment
+        * will run the program in a multi-threaded fashion in the same JVM as 
the
+        * environment was created in. The default parallelism of the local
+        * environment is the number of hardware contexts (CPU cores / threads),
+        * unless it was specified differently by {@link #setParallelism(int)}.
+        *
+        * @param configuration
+        *              Pass a custom configuration into the cluster
+        * @return A local execution environment with the specified parallelism.
+        */
+       public static PythonStreamExecutionEnvironment 
create_local_execution_environment(Configuration config) {
+               return new PythonStreamExecutionEnvironment(new 
LocalStreamEnvironment(config));
+       }
+
+       /**
+        * A thin wrapper layer over {@link 
StreamExecutionEnvironment#createLocalEnvironment(int, Configuration)}
+        *
+        * @param parallelism
+        *              The parallelism for the local environment.
+        * @param config
+        *              Pass a custom configuration into the cluster
+        * @return A local python execution environment with the specified 
parallelism.
+        */
+       public static PythonStreamExecutionEnvironment 
create_local_execution_environment(int parallelism, Configuration config) {
+               return new PythonStreamExecutionEnvironment(
+                                               
StreamExecutionEnvironment.createLocalEnvironment(parallelism, config));
+       }
+
+       /**
+        * A thin wrapper layer over {@link 
StreamExecutionEnvironment#createRemoteEnvironment(java.lang.String, int, 
java.lang.String...)}
+        *
+        * @param host
+        *              The host name or address of the master (JobManager), 
where the
+        *              program should be executed.
+        * @param port
+        *              The port of the master (JobManager), where the program 
should
+        *              be executed.
+        * @param jar_files
+        *              The JAR files with code that needs to be shipped to the
+        *              cluster. If the program uses user-defined functions,
+        *              user-defined input formats, or any libraries, those 
must be
+        *              provided in the JAR files.
+        * @return A remote environment that executes the program on a cluster.
+        */
+       public static PythonStreamExecutionEnvironment 
create_remote_execution_environment(
+               String host, int port, String... jar_files) {
+               return new PythonStreamExecutionEnvironment(
+                                               
StreamExecutionEnvironment.createRemoteEnvironment(host, port, jar_files));
+       }
+
+       /**
+        * A thin wrapper layer over {@link 
StreamExecutionEnvironment#createRemoteEnvironment(
+        * java.lang.String, int, Configuration, java.lang.String...)}
+        *
+        * @param host
+        *              The host name or address of the master (JobManager), 
where the
+        *              program should be executed.
+        * @param port
+        *              The port of the master (JobManager), where the program 
should
+        *              be executed.
+        * @param config
+        *              The configuration used by the client that connects to 
the remote cluster.
+        * @param jar_files
+        *              The JAR files with code that needs to be shipped to the
+        *              cluster. If the program uses user-defined functions,
+        *              user-defined input formats, or any libraries, those 
must be
+        *              provided in the JAR files.
+        * @return A remote environment that executes the program on a cluster.
+        *
+        */
+       public static PythonStreamExecutionEnvironment 
create_remote_execution_environment(
+               String host, int port, Configuration config, String... 
jar_files) {
+               return new PythonStreamExecutionEnvironment(
+                                               
StreamExecutionEnvironment.createRemoteEnvironment(host, port, config, 
jar_files));
+       }
+
+       /**
+        * A thin wrapper layer over {@link 
StreamExecutionEnvironment#createRemoteEnvironment(
+        * java.lang.String, int, int, java.lang.String...)}
+        *
+        * @param host
+        *              The host name or address of the master (JobManager), 
where the
+        *              program should be executed.
+        * @param port
+        *              The port of the master (JobManager), where the program 
should
+        *              be executed.
+        * @param parallelism
+        *              The parallelism to use during the execution.
+        * @param jar_files
+        *              The JAR files with code that needs to be shipped to the
+        *              cluster. If the program uses user-defined functions,
+        *              user-defined input formats, or any libraries, those 
must be
+        *              provided in the JAR files.
+        * @return A remote environment that executes the program on a cluster.
+        */
+       public static PythonStreamExecutionEnvironment 
create_remote_execution_environment(
+               String host, int port, int parallelism, String... jar_files) {
+               return new PythonStreamExecutionEnvironment(
+                                               
StreamExecutionEnvironment.createRemoteEnvironment(host, port, parallelism, 
jar_files));
+       }
+
+       private PythonStreamExecutionEnvironment(StreamExecutionEnvironment 
env) {
+               this.env = env;
+               this.registerJythonSerializers();
+       }
+
+       private void registerJythonSerializers() {
+               this.env.registerTypeWithKryoSerializer(PyString.class, 
PyObjectSerializer.class);
+               this.env.registerTypeWithKryoSerializer(PyInteger.class, 
PyObjectSerializer.class);
+               this.env.registerTypeWithKryoSerializer(PyLong.class, 
PyObjectSerializer.class);
+               this.env.registerTypeWithKryoSerializer(PyUnicode.class, 
PyObjectSerializer.class);
+               this.env.registerTypeWithKryoSerializer(PyTuple.class, 
PyObjectSerializer.class);
+               this.env.registerTypeWithKryoSerializer(PyObjectDerived.class, 
PyObjectSerializer.class);
+               this.env.registerTypeWithKryoSerializer(PyInstance.class, 
PyObjectSerializer.class);
+       }
+
+       public PythonDataStream create_python_source(SourceFunction<Object> 
src) throws Exception {
+               return new PythonDataStream<>(env.addSource(new 
PythonGeneratorFunction(src)).map(new UtilityFunctions.SerializerMap<>()));
+       }
+
+       /**
+        * Add a java source to the streaming topology. The source expected to 
be an java based
+        * implementation (.e.g. Kafka connector).
+        *
+        * @param src  A native java source (e.g. PythonFlinkKafkaConsumer09)
+        * @return Python data stream
+        */
+       public PythonDataStream add_java_source(SourceFunction<Object> src) {
+               return new PythonDataStream<>(env.addSource(src).map(new 
UtilityFunctions.SerializerMap<>()));
+       }
+
+       /**
+        * A thin wrapper layer over {@link 
StreamExecutionEnvironment#fromElements(java.lang.Object[])}
+        *
+        * @param elements
+        *              The array of PyObject elements to create the data 
stream from.
+        * @return The data stream representing the given array of elements
+        */
+       public PythonDataStream from_elements(PyObject... elements) {
+               return new PythonDataStream<>(env.fromElements(elements));
+       }
+
+       /**
+        * A thin wrapper layer over {@link 
StreamExecutionEnvironment#fromCollection(java.util.Collection)}
+        *
+        * <p>The input {@code Collection} is of type {@code Object}, because 
it is a collection
+        * of Python elements. * There type is determined in runtime, by the 
Jython framework.</p>
+        *
+        * @param collection
+        *              The collection of python elements to create the data 
stream from.
+        * @return
+        *     The data stream representing the given collection
+        */
+       public PythonDataStream from_collection(Collection<Object> collection) {
+               return new 
PythonDataStream<>(env.fromCollection(collection).map(new 
UtilityFunctions.SerializerMap<>()));
+       }
+
+       /**
+        * Creates a python data stream from the given iterator.
+        *
+        * <p>Note that this operation will result in a non-parallel data 
stream source, i.e.,
+        * a data stream source with a parallelism of one.</p>
+        *
+        * @param iter
+        *              The iterator of elements to create the data stream from
+        * @return The data stream representing the elements in the iterator
+        * @see StreamExecutionEnvironment#fromCollection(java.util.Iterator, 
org.apache.flink.api.common.typeinfo.TypeInformation)
+        */
+       public PythonDataStream from_collection(Iterator<Object> iter) throws 
Exception  {
+               return new PythonDataStream<>(env.addSource(new 
PythonIteratorFunction(iter), TypeExtractor.getForClass(Object.class))
+                       .map(new UtilityFunctions.SerializerMap<>()));
+       }
+
+       /**
+        * A thin wrapper layer over {@link 
StreamExecutionEnvironment#generateSequence(long, long)}.
+        *
+        * @param from
+        *              The number to start at (inclusive)
+        * @param to
+        *              The number to stop at (inclusive)
+        * @return A python data stream, containing all number in the [from, 
to] interval
+        */
+       public PythonDataStream generate_sequence(long from, long to) {
+               return new PythonDataStream<>(env.generateSequence(from, 
to).map(new UtilityFunctions.SerializerMap<Long>()));
+       }
+
+       /**
+        * A thin wrapper layer over {@link 
StreamExecutionEnvironment#readTextFile(java.lang.String)}.
+        *
+        * @param path
+        *              The path of the file, as a URI (e.g., 
"file:///some/local/file" or "hdfs://host:port/file/path").
+        * @return The data stream that represents the data read from the given 
file as text lines
+        * @throws IOException
+        */
+
+       public PythonDataStream read_text_file(String path) throws IOException {
+               return new PythonDataStream<>(env.readTextFile(path).map(new 
UtilityFunctions.SerializerMap<String>()));
+       }
+
+       /**
+        * A thin wrapper layer over {@link 
StreamExecutionEnvironment#socketTextStream(java.lang.String, int)}.
+        *
+        * @param host
+        *              The host name which a server socket binds
+        * @param port
+        *              The port number which a server socket binds. A port 
number of 0 means that the port number is automatically
+        *              allocated.
+        * @return A python data stream containing the strings received from 
the socket
+        */
+       public PythonDataStream socket_text_stream(String host, int port) {
+               return new PythonDataStream<>(env.socketTextStream(host, 
port).map(new UtilityFunctions.SerializerMap<String>()));
+       }
+
+       /**
+        * A thin wrapper layer over {@link 
StreamExecutionEnvironment#enableCheckpointing(long)}.
+        *
+        * @param interval Time interval between state checkpoints in 
milliseconds.
+        * @return The same {@code PythonStreamExecutionEnvironment} instance 
of the caller
+        */
+       public PythonStreamExecutionEnvironment enable_checkpointing(long 
interval) {
+               this.env.enableCheckpointing(interval);
+               return this;
+       }
+
+       /**
+        * A thin wrapper layer over {@link 
StreamExecutionEnvironment#enableCheckpointing(long, CheckpointingMode)}.
+        *
+        * @param interval Time interval between state checkpoints in 
milliseconds.
+        * @param mode
+        *             The checkpointing mode, selecting between "exactly once" 
and "at least once" guaranteed.
+        * @return The same {@code PythonStreamExecutionEnvironment} instance 
of the caller
+        */
+       public PythonStreamExecutionEnvironment enable_checkpointing(long 
interval, CheckpointingMode mode) {
+               this.env.enableCheckpointing(interval, mode);
+               return this;
+       }
+
+       /**
+        * A thin wrapper layer over {@link 
StreamExecutionEnvironment#setParallelism(int)}.
+        *
+        * @param parallelism The parallelism
+        * @return The same {@code PythonStreamExecutionEnvironment} instance 
of the caller
+        */
+       public PythonStreamExecutionEnvironment set_parallelism(int 
parallelism) {
+               this.env.setParallelism(parallelism);
+               return this;
+       }
+
+       /**
+        * A thin wrapper layer over {@link 
StreamExecutionEnvironment#execute()}.
+        *
+        * @return The result of the job execution
+        */
+       public JobExecutionResult execute() throws Exception {
+               return execute(false);
+       }
+
+       /**
+        * A thin wrapper layer over {@link 
StreamExecutionEnvironment#execute()}.
+        *
+        * <p>In addition, it enables the caller to provide a hint about the 
execution mode - whether it is local
+        * or remote. In the case of local execution, the relevant cached files 
are distributed using the
+        * local machine temporary folder, otherwise a shared storage medium is 
used for this purpose.</p>
+        *
+        * @return The result of the job execution
+        */
+       public JobExecutionResult execute(Boolean local) throws Exception {
+               if (PythonEnvironmentConfig.pythonTmpCachePath == null) {
+                       // Nothing to be done! Is is executed on the task 
manager.
+                       return new JobExecutionResult(null, 0, null);
+               }
+               distributeFiles(local);
+               JobExecutionResult result = this.env.execute();
+               cleanupDistributedFiles();
+               return result;
+       }
+
+       /**
+        * A thin wrapper layer over {@link 
StreamExecutionEnvironment#execute(java.lang.String)}.
+        *
+        * @return The result of the job execution, containing elapsed time and 
accumulators.
+        * @throws Exception which occurs during job execution.
+        */
+       public JobExecutionResult execute(String job_name) throws Exception {
+               return execute(job_name, false);
+       }
+
+       /**
+        * A thin wrapper layer over {@link 
StreamExecutionEnvironment#execute(java.lang.String).
+        *
+        * <p>In addition, it enables the caller to provide a hint about the 
execution mode - whether it is local
+        * or remote. In the case of local execution, the relevant cached files 
are distributed using the
+        * local machine temporary folder, otherwise a shared storage medium is 
used for this purpose.</p>
+        *
+        * @return The result of the job execution, containing elapsed time and 
accumulators.
+        * @throws Exception which occurs during job execution.
+        */
+       public JobExecutionResult execute(String job_name, Boolean local) 
throws Exception {
+               if (PythonEnvironmentConfig.pythonTmpCachePath == null) {
+                       // Nothing to be done! Is is executed on the task 
manager.
+                       return new JobExecutionResult(null, 0, null);
+               }
+               distributeFiles(local);
+               JobExecutionResult result = this.env.execute(job_name);
+               cleanupDistributedFiles();
+               return result;
+       }
+
+       private void distributeFiles(boolean local) throws IOException, 
URISyntaxException
+       {
+               String rootDir;
+               if (local || this.env instanceof LocalStreamEnvironment) {
+                       rootDir = System.getProperty("java.io.tmpdir");
+               } else {
+                       rootDir = "hdfs:///tmp";
+               }
+               PythonEnvironmentConfig.FLINK_HDFS_PATH = Paths.get(rootDir, 
"flink_cache_" +
+                       (new 
Random(System.currentTimeMillis())).nextLong()).toString();
+
+               FileCache.copy(new 
Path(PythonEnvironmentConfig.pythonTmpCachePath),
+                       new Path(PythonEnvironmentConfig.FLINK_HDFS_PATH), 
true);
+
+               
this.env.registerCachedFile(PythonEnvironmentConfig.FLINK_HDFS_PATH, 
PythonEnvironmentConfig.FLINK_PYTHON_DC_ID);
+       }
+
+       private void cleanupDistributedFiles() throws IOException, 
URISyntaxException {
+               for (Tuple2<String, DistributedCache.DistributedCacheEntry> e : 
this.env.getCachedFiles()) {
+                       URI fileUri = new URI(e.f1.filePath);
+                       FileSystem fs = FileSystem.get(fileUri);
+                       LOG.debug(String.format("Cleaning up cached path: %s, 
uriPath: %s, fileSystem: %s",
+                               e.f1.filePath,
+                               fileUri.getPath(),
+                               fs.getClass().getName()));
+                       fs.delete(new Path(fileUri.getPath()), true);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PyKey.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PyKey.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PyKey.java
new file mode 100644
index 0000000..52ac4ff
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PyKey.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.functions;
+
+
+/**
+ * A key type used by {@link 
org.apache.flink.streaming.python.api.functions.PythonKeySelector} to
+ * host a python object and provide the necessary interface to compare two 
python objects.
+ * It is used internally by the python thin wrapper layer over the streaming 
data sets.
+ */
+public class PyKey {
+       private Object data;
+
+       public PyKey(Object data) {
+               this.data = data;
+       }
+
+
+       public Object getData() {
+               return data;
+       }
+
+       public void setData(Object data) {
+               this.data = data;
+       }
+
+       @Override
+       public boolean equals(Object other) {
+               if (!(other instanceof PyKey)) {
+                       return false;
+               }
+               return (((PyKey) other).data.equals(this.data));
+       }
+
+       @Override
+       public int hashCode() {
+               return data.hashCode();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonApplyFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonApplyFunction.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonApplyFunction.java
new file mode 100644
index 0000000..8fe3245
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonApplyFunction.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.functions;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.python.util.PythonCollector;
+import org.apache.flink.streaming.python.util.serialization.SerializationUtils;
+import org.apache.flink.util.Collector;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * The {@code PythonApplyFunction} is a thin wrapper layer over a Python UDF 
{@code WindowFunction}.
+ * It receives an {@code WindowFunction} as an input and keeps it internally 
in a serialized form.
+ * It is then delivered, as part of job graph, up to the TaskManager, then it 
is opened and becomes
+ * a sort of mediator to the Python UDF {@code WindowFunction}.
+ *
+ * <p>This function is used internally by the Python thin wrapper layer over 
the streaming data
+ * functionality</p>
+ */
+public class PythonApplyFunction<W extends Window> extends 
RichWindowFunction<PyObject, PyObject, PyKey, W> {
+       private static final long serialVersionUID = 577032239468987781L;
+
+       private final byte[] serFun;
+       private transient WindowFunction<PyObject, PyObject, Object, Window> 
fun;
+       private transient PythonCollector collector;
+
+       public PythonApplyFunction(WindowFunction<PyObject, PyObject, Object, 
W> fun) throws IOException {
+               this.serFun = SerializationUtils.serializeObject(fun);
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public void open(Configuration parameters) throws Exception {
+               this.fun =
+                       (WindowFunction<PyObject, PyObject, Object, Window>) 
UtilityFunctions.smartFunctionDeserialization(
+                       getRuntimeContext(), this.serFun);
+               if (this.fun instanceof RichWindowFunction) {
+                       final RichWindowFunction winFun = 
(RichWindowFunction)this.fun;
+                       winFun.setRuntimeContext(getRuntimeContext());
+                       winFun.open(parameters);
+               }
+               this.collector = new PythonCollector();
+       }
+
+       @Override
+       public void close() throws Exception {
+               if (this.fun instanceof RichWindowFunction) {
+                       ((RichWindowFunction)this.fun).close();
+               }
+       }
+
+       @Override
+       public void apply(PyKey key, W window, Iterable<PyObject> values, 
Collector<PyObject> out) throws Exception {
+               this.collector.setCollector(out);
+               this.fun.apply(key.getData(), window, values, this.collector);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFilterFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFilterFunction.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFilterFunction.java
new file mode 100644
index 0000000..345fdcc
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFilterFunction.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.functions;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichFilterFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.python.util.serialization.SerializationUtils;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+
+/**
+ * The {@code PythonFilterFunction} is a thin wrapper layer over a Python UDF 
{@code FilterFunction}.
+ * It receives a {@code FilterFunction} as an input and keeps it internally in 
a serialized form.
+ * It is then delivered, as part of job graph, up to the TaskManager, then it 
is opened and becomes
+ * a sort of mediator to the Python UDF {@code FilterFunction}.
+ *
+ * <p>This function is used internally by the Python thin wrapper layer over 
the streaming data
+ * functionality</p>
+ */
+public class PythonFilterFunction extends RichFilterFunction<PyObject> {
+       private static final long serialVersionUID = 775688642701399472L;
+
+       private final byte[] serFun;
+       private transient FilterFunction<PyObject> fun;
+
+       public PythonFilterFunction(FilterFunction<PyObject> fun) throws 
IOException {
+               this.serFun = SerializationUtils.serializeObject(fun);
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public void open(Configuration parameters) throws Exception {
+               this.fun = (FilterFunction<PyObject>) 
UtilityFunctions.smartFunctionDeserialization(
+                       getRuntimeContext(), this.serFun);
+               if (this.fun instanceof RichFunction) {
+                       final RichFilterFunction filterFun = 
(RichFilterFunction)this.fun;
+                       filterFun.setRuntimeContext(getRuntimeContext());
+                       filterFun.open(parameters);
+               }
+       }
+
+       @Override
+       public void close() throws Exception {
+               if (this.fun instanceof RichFunction) {
+                       ((RichFilterFunction)this.fun).close();
+               }
+       }
+
+       @Override
+       public boolean filter(PyObject value) throws Exception {
+               return this.fun.filter(value);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFlatMapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFlatMapFunction.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFlatMapFunction.java
new file mode 100644
index 0000000..21554c7
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFlatMapFunction.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.functions;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.python.util.PythonCollector;
+import org.apache.flink.streaming.python.util.serialization.SerializationUtils;
+import org.apache.flink.util.Collector;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+
+/**
+ * The {@code PythonFlatMapFunction} is a thin wrapper layer over a Python UDF 
{@code FlatMapFunction}.
+ * It receives a {@code FlatMapFunction} as an input and keeps it internally 
in a serialized form.
+ * It is then delivered, as part of the job graph, up to the TaskManager, then 
it is opened and becomes
+ * a sort of mediator to the Python UDF {@code FlatMapFunction}.
+ *
+ * <p>This function is used internally by the Python thin wrapper layer over 
the streaming data
+ * functionality</p>
+ */
+public class PythonFlatMapFunction extends RichFlatMapFunction<PyObject, 
PyObject> {
+       private static final long serialVersionUID = -6098432222172956477L;
+
+       private final byte[] serFun;
+       private transient FlatMapFunction<PyObject, PyObject> fun;
+       private transient PythonCollector collector;
+
+       public PythonFlatMapFunction(FlatMapFunction<PyObject, PyObject> fun) 
throws IOException {
+               this.serFun = SerializationUtils.serializeObject(fun);
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public void open(Configuration config) throws Exception {
+               this.fun =
+                       (FlatMapFunction<PyObject, PyObject>) 
UtilityFunctions.smartFunctionDeserialization(
+                               getRuntimeContext(), serFun);
+               if (this.fun instanceof RichFunction) {
+                       final RichFlatMapFunction flatMapFun = 
(RichFlatMapFunction)this.fun;
+                       flatMapFun.setRuntimeContext(getRuntimeContext());
+                       flatMapFun.open(config);
+               }
+               this.collector = new PythonCollector();
+       }
+
+       @Override
+       public void close() throws Exception {
+               if (this.fun instanceof RichFunction) {
+                       ((RichFlatMapFunction)this.fun).close();
+               }
+       }
+
+       @Override
+       public void flatMap(PyObject value, Collector<PyObject> out) throws 
Exception {
+               this.collector.setCollector(out);
+               this.fun.flatMap(value, this.collector);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonGeneratorFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonGeneratorFunction.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonGeneratorFunction.java
new file mode 100644
index 0000000..70f212e
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonGeneratorFunction.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.functions;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.util.serialization.SerializationUtils;
+
+import java.io.IOException;
+
+/**
+ * The {@code PythonGeneratorFunction} is a thin wrapper layer over a Python 
UDF {@code SourceFunction}.
+ * It receives a {@code SourceFunction} as an input and keeps it internally in 
a serialized form.
+ * It is then delivered, as part of the job graph, up to the TaskManager, then 
it is opened and becomes
+ * a sort of mediator to the Python UDF {@code SourceFunction}.
+ *
+ * <p>This function is used internally by the Python thin wrapper layer over 
the streaming data
+ * functionality</p>
+ */
+public class PythonGeneratorFunction extends RichSourceFunction<Object> {
+       private static final long serialVersionUID = 3854587935845323082L;
+
+       private final byte[] serFun;
+       private transient SourceFunction<Object> fun;
+
+       public PythonGeneratorFunction(SourceFunction<Object> fun) throws 
IOException {
+               this.serFun = SerializationUtils.serializeObject(fun);
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public void open(Configuration parameters) throws Exception {
+               this.fun = (SourceFunction<Object>) 
UtilityFunctions.smartFunctionDeserialization(
+                       getRuntimeContext(), this.serFun);
+       }
+
+       @Override
+       public void close() throws Exception {}
+
+       public void run(SourceContext<Object> ctx) throws Exception {
+               this.fun.run(ctx);
+       }
+
+       public void cancel() {
+               if (this.fun != null) {
+                       this.fun.cancel();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonIteratorFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonIteratorFunction.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonIteratorFunction.java
new file mode 100644
index 0000000..38bf175
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonIteratorFunction.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.functions;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.python.util.serialization.SerializationUtils;
+
+import java.util.Iterator;
+import java.io.IOException;
+
+/**
+ * The {@code PythonIteratorFunction} is a thin wrapper layer over a Python 
UDF {@code Iterator}.
+ * It receives an {@code Iterator} as an input and keeps it internally in a 
serialized form.
+ * It is then delivered, as part of the job graph, up to the TaskManager, then 
it is opened and becomes
+ * a sort of mediator to the Python UDF {@code Iterator}.
+ *
+ * <p>This function is used internally by the Python thin wrapper layer over 
the streaming data
+ * functionality</p>
+ */
+public class PythonIteratorFunction extends RichSourceFunction<Object> {
+       private static final long serialVersionUID = 6741748297048588334L;
+
+       private final byte[] serFun;
+       private transient Iterator<Object> fun;
+       private transient volatile boolean isRunning;
+
+       public PythonIteratorFunction(Iterator<Object> fun) throws IOException {
+               this.serFun = SerializationUtils.serializeObject(fun);
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public void open(Configuration parameters) throws Exception {
+               this.isRunning = true;
+               this.fun = (Iterator<Object>) 
UtilityFunctions.smartFunctionDeserialization(getRuntimeContext(), this.serFun);
+       }
+
+       @Override
+       public void close() throws Exception {
+       }
+
+       @Override
+       public void run(SourceContext<Object> ctx) throws Exception {
+               while (isRunning && this.fun.hasNext()) {
+                       ctx.collect(this.fun.next());
+               }
+       }
+
+       @Override
+       public void cancel() {
+               isRunning = false;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonKeySelector.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonKeySelector.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonKeySelector.java
new file mode 100644
index 0000000..7cdaacb
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonKeySelector.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.functions;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.python.util.serialization.SerializationUtils;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * The {@code PythonKeySelector} is a thin wrapper layer over a Python UDF 
{@code KeySelector}.
+ * It receives a {@code KeySelector} as an input and keeps it internally in a 
serialized form.
+ * It is then delivered, as part of the job graph, up to the TaskManager, then 
it is opened and becomes
+ * a sort of mediator to the Python UDF {@code KeySelector}.
+ *
+ * <p>This function is used internally by the Python thin wrapper layer over 
the streaming data
+ * functionality</p>
+ */
+public class PythonKeySelector implements KeySelector<PyObject, PyKey> {
+       private static final long serialVersionUID = 7403775239671366607L;
+       private final byte[] serFun;
+       private transient KeySelector<PyObject, Object> fun;
+
+       public PythonKeySelector(KeySelector<PyObject, PyKey> fun) throws 
IOException {
+               this.serFun = SerializationUtils.serializeObject(fun);
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public PyKey getKey(PyObject value) throws Exception {
+               if (fun == null) {
+                       fun = (KeySelector<PyObject, Object>) 
SerializationUtils.deserializeObject(serFun);
+               }
+               Object key = fun.getKey(value);
+               return new PyKey(key);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonMapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonMapFunction.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonMapFunction.java
new file mode 100644
index 0000000..24d19da
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonMapFunction.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.functions;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.python.util.serialization.SerializationUtils;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * The {@code PythonMapFunction} is a thin wrapper layer over a Python UDF 
{@code MapFunction}.
+ * It receives a {@code MapFunction} as an input and keeps it internally in a 
serialized form.
+ * It is then delivered, as part of the job graph, up to the TaskManager, then 
it is opened and becomes
+ * a sort of mediator to the Python UDF {@code MapFunction}.
+ *
+ * <p>This function is used internally by the Python thin wrapper layer over 
the streaming data
+ * functionality</p>
+ */
+public class PythonMapFunction extends RichMapFunction<PyObject, PyObject> {
+       private static final long serialVersionUID = 3001212087036451818L;
+       private final byte[] serFun;
+       private transient MapFunction<PyObject, PyObject> fun;
+
+       public PythonMapFunction(MapFunction<PyObject, PyObject> fun) throws 
IOException {
+               this.serFun = SerializationUtils.serializeObject(fun);
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public void open(Configuration config) throws Exception {
+               this.fun = (MapFunction<PyObject, PyObject>) 
UtilityFunctions.smartFunctionDeserialization(
+                       getRuntimeContext(), serFun);
+               if (this.fun instanceof RichFunction) {
+                       final RichMapFunction mapFun = 
(RichMapFunction)this.fun;
+                       mapFun.setRuntimeContext(getRuntimeContext());
+                       mapFun.open(config);
+               }
+       }
+
+       @Override
+       public void close() throws Exception {
+               if (this.fun instanceof RichFunction) {
+                       ((RichMapFunction)this.fun).close();
+               }
+       }
+
+       @Override
+       public PyObject map(PyObject value) throws Exception {
+               return UtilityFunctions.adapt(this.fun.map(value));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonOutputSelector.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonOutputSelector.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonOutputSelector.java
new file mode 100644
index 0000000..f022322
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonOutputSelector.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.functions;
+
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.python.util.serialization.SerializationUtils;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * The {@code PythonOutputSelector} is a thin wrapper layer over a Python UDF 
{@code OutputSelector}.
+ * It receives an {@code OutputSelector} as an input and keeps it internally 
in a serialized form.
+ * It is then delivered, as part of the job graph, up to the TaskManager, then 
it is opened and becomes
+ * a sort of mediator to the Python UDF {@code OutputSelector}.
+ *
+ * <p>This function is used internally by the Python thin wrapper layer over 
the streaming data
+ * functionality</p>
+ */
+public class PythonOutputSelector implements OutputSelector<PyObject> {
+       private static final long serialVersionUID = 909266346633598177L;
+
+       private final byte[] serFun;
+       private transient OutputSelector<PyObject> fun;
+
+       public PythonOutputSelector(OutputSelector<PyObject> fun) throws 
IOException {
+               this.serFun = SerializationUtils.serializeObject(fun);
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public Iterable<String> select(PyObject value) {
+               if (this.fun == null) {
+                       try {
+                               this.fun = (OutputSelector<PyObject>) 
SerializationUtils.deserializeObject(this.serFun);
+                       } catch (IOException e) {
+                               e.printStackTrace();
+                       } catch (ClassNotFoundException e) {
+                               e.printStackTrace();
+                       }
+               }
+               if (this.fun == null) {
+                       return null;
+               }
+               return this.fun.select(value);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonReduceFunction.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonReduceFunction.java
new file mode 100644
index 0000000..d02240c
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonReduceFunction.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.functions;
+
+import java.io.IOException;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.python.util.serialization.SerializationUtils;
+import org.python.core.PyObject;
+
+/**
+ * The {@code PythonReduceFunction} is a thin wrapper layer over a Python UDF 
{@code ReduceFunction}.
+ * It receives a {@code ReduceFunction} as an input and keeps it internally in 
a serialized form.
+ * It is then delivered, as part of the job graph, up to the TaskManager, then 
it is opened and becomes
+ * a sort of mediator to the Python UDF {@code ReduceFunction}.
+ *
+ * <p>This function is used internally by the Python thin wrapper layer over 
the streaming data
+ * functionality</p>
+ */
+public class PythonReduceFunction implements ReduceFunction<PyObject> {
+       private static final long serialVersionUID = -9070596504893036458L;
+
+       private final byte[] serFun;
+       private transient ReduceFunction<PyObject> fun;
+
+       public PythonReduceFunction(ReduceFunction<PyObject> fun) throws 
IOException {
+               this.serFun = SerializationUtils.serializeObject(fun);
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public PyObject reduce(PyObject value1, PyObject value2) throws 
Exception {
+               if (fun == null) {
+                       fun = (ReduceFunction<PyObject>) 
SerializationUtils.deserializeObject(serFun);
+               }
+
+               return UtilityFunctions.adapt(this.fun.reduce(value1, value2));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonSinkFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonSinkFunction.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonSinkFunction.java
new file mode 100644
index 0000000..e0351b0
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonSinkFunction.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.functions;
+
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.python.util.serialization.SerializationUtils;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * The {@code PythonSinkFunction} is a thin wrapper layer over a Python UDF 
{@code SinkFunction}.
+ * It receives a {@code SinkFunction} as an input and keeps it internally in a 
serialized form.
+ * It is then delivered, as part of the job graph, up to the TaskManager, then 
it is opened and becomes
+ * a sort of mediator to the Python UDF {@code SinkFunction}.
+ *
+ * <p>This function is used internally by the Python thin wrapper layer over 
the streaming data
+ * functionality</p>
+ */
+public class PythonSinkFunction extends RichSinkFunction<PyObject> {
+       private static final long serialVersionUID = -9030596504893036458L;
+       private final byte[] serFun;
+       private transient SinkFunction<PyObject> fun;
+       private transient RuntimeContext context;
+
+       public PythonSinkFunction(SinkFunction<PyObject> fun) throws 
IOException {
+               this.serFun = SerializationUtils.serializeObject(fun);
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public void open(Configuration parameters) throws Exception {
+               this.fun = (SinkFunction<PyObject>) 
UtilityFunctions.smartFunctionDeserialization(
+                       getRuntimeContext(), this.serFun);
+               if (this.fun instanceof RichFunction) {
+                       final RichSinkFunction sinkFunction = 
(RichSinkFunction)this.fun;
+                       sinkFunction.setRuntimeContext(getRuntimeContext());
+                       sinkFunction.open(parameters);
+               }
+       }
+
+       @Override
+       public void close() throws Exception {
+               if (this.fun instanceof RichFunction) {
+                       ((RichSinkFunction)this.fun).close();
+               }
+       }
+
+       @Override
+       public void invoke(PyObject value) throws Exception {
+               this.fun.invoke(value);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/UtilityFunctions.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/UtilityFunctions.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/UtilityFunctions.java
new file mode 100644
index 0000000..0417a43
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/UtilityFunctions.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.functions;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.streaming.python.api.environment.PythonEnvironmentConfig;
+import org.apache.flink.streaming.python.util.serialization.SerializationUtils;
+import org.python.core.Py;
+import org.python.core.PyObject;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.python.core.PySystemState;
+import org.python.util.PythonInterpreter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * A collection of utility functions that are used by the python wrappers thin 
layer over
+ * the streaming functions.
+ */
+public class UtilityFunctions {
+       private static boolean jythonInitialized = false;
+       private static PythonInterpreter pythonInterpreter;
+       private static final Logger LOG = 
LoggerFactory.getLogger(UtilityFunctions.class);
+
+       private UtilityFunctions() {
+       }
+
+       /**
+        * A generic map operator that convert any java type to PyObject. It is 
mainly used to convert elements
+        * collected from a source functions, to PyObject objects.
+        *
+        * @param <IN> Any given java object
+        */
+       public static class SerializerMap<IN> implements MapFunction<IN, 
PyObject> {
+               private static final long serialVersionUID = 
1582769662549499373L;
+
+               @Override
+               public PyObject map(IN value) throws Exception {
+                       return UtilityFunctions.adapt(value);
+               }
+       }
+
+       /**
+        * Convert java object to its corresponding PyObject representation.
+        *
+        * @param o Java object
+        * @return PyObject
+        */
+       public static PyObject adapt(Object o) {
+               if (o instanceof PyObject) {
+                       return (PyObject)o;
+               }
+               return  Py.java2py(o);
+       }
+
+       /**
+        * Deserializes Python UDF functions in a "smart" way. It first tries 
to extract the function using a common
+        * java method to deserialize an object. If it fails, it is assumed 
that the function object definition does not
+        * exist, and therefore, a Jython interpreter is initialized and the 
relevant python script is executed (without
+        * actually executing the 'execute' function). This result from this 
operation is that all the Python UDF objects
+        * that are defined in that python script are also defined in the local 
JVM. Is is now possible to extract
+        * the function using a common java method to deserialize the java 
function.
+        *
+        * @param runtimeCtx The runtime context of the executed job.
+        * @param serFun A serialized form of the given Python UDF
+        * @return An extracted java function
+        * @throws IOException
+        * @throws ClassNotFoundException
+        * @throws InterruptedException
+        */
+       public static synchronized Object 
smartFunctionDeserialization(RuntimeContext runtimeCtx, byte[] serFun) throws 
IOException, ClassNotFoundException, InterruptedException {
+               try {
+                       return SerializationUtils.deserializeObject(serFun);
+               } catch (Exception e) {
+                       String path = 
runtimeCtx.getDistributedCache().getFile(PythonEnvironmentConfig.FLINK_PYTHON_DC_ID).getAbsolutePath();
+
+                       initPythonInterpreter(path, new String[]{""});
+
+                       PySystemState pySysStat = Py.getSystemState();
+                       pySysStat.path.add(0, path);
+
+                       String scriptFullPath = path + File.separator + 
PythonEnvironmentConfig.FLINK_PYTHON_PLAN_NAME;
+                       LOG.debug("Execute python script, path=" + 
scriptFullPath);
+                       pythonInterpreter.execfile(scriptFullPath);
+
+                       pySysStat.path.remove(path);
+
+                       return SerializationUtils.deserializeObject(serFun);
+               }
+       }
+
+       /**
+        * Initializes the Jython interpreter and executes a python script.
+        *
+        * @param scriptFullPath The script full path
+        * @param args Command line arguments that will be delivered to the 
executed python script
+        * @throws IOException
+        */
+       public static void initAndExecPythonScript(File scriptFullPath, 
String[] args) throws IOException {
+               initPythonInterpreter(scriptFullPath.getParent(), args);
+               pythonInterpreter.execfile(scriptFullPath.getAbsolutePath());
+
+               LOG.debug("Cleaning up temporary folder: " + 
scriptFullPath.getParent());
+               FileSystem fs = FileSystem.get(scriptFullPath.toURI());
+               fs.delete(new Path(scriptFullPath.getParent()), true);
+       }
+
+       private static synchronized void initPythonInterpreter(String 
pythonPath, String[] args) {
+               if (!jythonInitialized) {
+                       LOG.debug("Init python interpreter, path=" + 
pythonPath);
+                       Properties postProperties = new Properties();
+                       postProperties.put("python.path", pythonPath);
+                       PythonInterpreter.initialize(System.getProperties(), 
postProperties, args);
+
+                       pythonInterpreter = new PythonInterpreter();
+                       pythonInterpreter.setErr(System.err);
+                       pythonInterpreter.setOut(System.out);
+
+                       jythonInitialized = true;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaConsumer09.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaConsumer09.java
new file mode 100644
index 0000000..e6598a2
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaConsumer09.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.python.connectors;
+
+import 
org.apache.flink.streaming.python.util.serialization.PythonDeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Defines a generic implementation of the {@link FlinkKafkaConsumer09} class, 
by using a type
+ * parameter of an {@code Object} class. The thin Python streaming layer 
converts the elements
+ * that are read from the Kafka feed, to a {@code PyObject} for further 
handling.
+ */
+public class PythonFlinkKafkaConsumer09 extends FlinkKafkaConsumer09<Object> {
+
+       public PythonFlinkKafkaConsumer09(String topic, 
DeserializationSchema<Object> schema, Properties props) throws IOException {
+               super(topic, new PythonDeserializationSchema(schema), props);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaProducer09.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaProducer09.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaProducer09.java
new file mode 100644
index 0000000..b02b893
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaProducer09.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.connectors;
+
+import 
org.apache.flink.streaming.python.util.serialization.PythonSerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * Defines a generic Python implementation of {@link FlinkKafkaProducer09}, by 
using a type
+ * parameter of {@code PyObject} class. It can be used as a sink in the python 
code.
+ */
+public class PythonFlinkKafkaProducer09 extends FlinkKafkaProducer09<PyObject> 
{
+
+       public PythonFlinkKafkaProducer09(String brokerList, String topicId, 
SerializationSchema<PyObject> serializationSchema) throws IOException {
+               super(brokerList, topicId, new 
PythonSerializationSchema(serializationSchema));
+       }
+
+       public void set_log_failures_only(boolean logFailuresOnly) {
+               this.setLogFailuresOnly(logFailuresOnly);
+       }
+
+       public void set_flush_on_checkpoint(boolean flush) {
+               this.setFlushOnCheckpoint(flush);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonCollector.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonCollector.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonCollector.java
new file mode 100644
index 0000000..4caf668
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonCollector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import org.apache.flink.util.Collector;
+import org.python.core.PyObject;
+
+/**
+ * Collects a {@code PyObject} record and forwards it. It makes sure that the 
record is converted,
+ * if necessary, to a {@code PyObject}.
+ */
+@Public
+public class PythonCollector implements Collector<PyObject> {
+       private Collector<PyObject> collector;
+
+       public void setCollector(Collector<PyObject> collector) {
+               this.collector = collector;
+       }
+
+       @Override
+       public void collect(PyObject record) {
+               PyObject po = UtilityFunctions.adapt(record);
+               this.collector.collect(po);
+       }
+
+       @Override
+       public void close() {
+               this.collector.close();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonIterator.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonIterator.java
new file mode 100644
index 0000000..55fbd69
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonIterator.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+/**
+ * An interface definition for Python iterator. It is extended by Python 
user-defined
+ * functions (UDF), within the Python scripts.
+ */
+public interface PythonIterator extends Iterator, Serializable {
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java
new file mode 100644
index 0000000..2c8a257
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util.serialization;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * A serializer implementation for PyObject class type. Is is used by the Kryo 
serialization
+ * framework. {@see https://github.com/EsotericSoftware/kryo#serializers}
+ */
+public class PyObjectSerializer extends Serializer<PyObject> {
+
+       public void write (Kryo kryo, Output output, PyObject po) {
+               try {
+                       byte[] serPo = SerializationUtils.serializeObject(po);
+                       output.writeInt(serPo.length);
+                       output.write(serPo);
+               } catch (IOException e) {
+                       e.printStackTrace();
+               }
+       }
+
+       public PyObject read (Kryo kryo, Input input, Class<PyObject> type) {
+               int len = input.readInt();
+               byte[] serPo = new byte[len];
+               input.read(serPo);
+               PyObject po = null;
+               try {
+                       po = (PyObject) 
SerializationUtils.deserializeObject(serPo);
+               } catch (IOException e) {
+                       e.printStackTrace();
+               } catch (ClassNotFoundException e) {
+                       e.printStackTrace();
+               }
+               return po;
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonDeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonDeserializationSchema.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonDeserializationSchema.java
new file mode 100644
index 0000000..a51db89
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonDeserializationSchema.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util.serialization;
+
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.io.IOException;
+
+/**
+ * A Python deserialization schema, which implements {@link 
DeserializationSchema}. It converts a
+ * serialized form of {@code PyObject} into its Java Object representation.
+ */
+public class PythonDeserializationSchema implements 
DeserializationSchema<Object> {
+       private static final long serialVersionUID = -9180596504893036458L;
+       private final TypeInformation<Object> resultType = 
TypeInformation.of(new TypeHint<Object>(){});
+
+       private final byte[] serSchema;
+       private transient DeserializationSchema<Object> schema;
+
+       public PythonDeserializationSchema(DeserializationSchema<Object> 
schema) throws IOException {
+               this.serSchema = SerializationUtils.serializeObject(schema);
+       }
+
+       @SuppressWarnings("unchecked")
+       public Object deserialize(byte[] message) throws IOException {
+               if (this.schema == null) {
+                       try {
+                               this.schema = (DeserializationSchema<Object>) 
SerializationUtils.deserializeObject(this.serSchema);
+                       } catch (ClassNotFoundException e) {
+                               e.printStackTrace();
+                       }
+               }
+               return this.schema.deserialize(message);
+       }
+
+       @Override
+       public boolean isEndOfStream(Object nextElement) {
+               return this.schema.isEndOfStream(nextElement);
+       }
+
+       @Override
+       public TypeInformation<Object> getProducedType() {
+               return resultType;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonSerializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonSerializationSchema.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonSerializationSchema.java
new file mode 100644
index 0000000..a2dddaf
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonSerializationSchema.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util.serialization;
+
+import java.io.IOException;
+
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.python.core.PyObject;
+
+/**
+ * A Python serialization schema, which implements {@link 
SerializationSchema}. It converts
+ * a {@code PyObject} into its serialized form.
+ */
+public class PythonSerializationSchema implements 
SerializationSchema<PyObject> {
+       private static final long serialVersionUID = -9170596504893036458L;
+
+       private final byte[] serSchema;
+       private transient SerializationSchema<PyObject> schema;
+
+       public PythonSerializationSchema(SerializationSchema<PyObject> schema) 
throws IOException {
+               this.serSchema = SerializationUtils.serializeObject(schema);
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public byte[] serialize(PyObject element) {
+               if (this.schema == null) {
+                       try {
+                               this.schema = 
(SerializationSchema<PyObject>)SerializationUtils.deserializeObject(this.serSchema);
+                       } catch (IOException e) {
+                               e.printStackTrace();
+                       } catch (ClassNotFoundException e) {
+                               e.printStackTrace();
+                       }
+               }
+               return this.schema.serialize(element);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/SerializationUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/SerializationUtils.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/SerializationUtils.java
new file mode 100644
index 0000000..b7fc2a3
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/SerializationUtils.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.util.serialization;
+
+import 
org.apache.flink.streaming.python.api.datastream.PythonObjectInputStream2;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/**
+ * A generic serialization utility, which turns java objects into their 
serialization form as
+ * byte arrays and vise versa.
+ */
+public class SerializationUtils {
+       public static byte[] serializeObject(Object o) throws IOException {
+               try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+                       oos.writeObject(o);
+                       oos.flush();
+                       return baos.toByteArray();
+               }
+       }
+
+       public static Object deserializeObject(byte[] bytes) throws 
IOException, ClassNotFoundException {
+               try (ByteArrayInputStream bais = new 
ByteArrayInputStream(bytes); ObjectInputStream ois = new 
PythonObjectInputStream2(bais)) {
+                       return ois.readObject();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
 
b/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
new file mode 100644
index 0000000..5e4ec51
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.streaming.python.api.PythonStreamBinder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+public class PythonStreamBinderTest extends StreamingProgramTestBase {
+       final private static String defaultPythonScriptName = 
"run_all_tests.py";
+       final private static String flinkPythonRltvPath = 
"flink-libraries/flink-streaming-python";
+       final private static String pathToStreamingTests = 
"src/test/python/org/apache/flink/streaming/python/api";
+
+       public PythonStreamBinderTest() {
+       }
+
+       public static void main(String[] args) throws Exception {
+               if (args.length == 0) {
+                       args = prepareDefaultArgs();
+               } else {
+                       args[0] = findStreamTestFile(args[0]).getAbsolutePath();
+               }
+               PythonStreamBinder.main(args);
+       }
+
+       @Override
+       public void testProgram() throws Exception {
+               this.main(new String[]{});
+       }
+
+       private static String[] prepareDefaultArgs() throws Exception {
+               File testFullPath = findStreamTestFile(defaultPythonScriptName);
+               List<String> filesInTestPath = 
getFilesInFolder(testFullPath.getParent());
+
+               String[] args = new String[filesInTestPath.size() + 1];
+               args[0] = testFullPath.getAbsolutePath();
+
+               for (final ListIterator<String> it = 
filesInTestPath.listIterator(); it.hasNext();) {
+                       final String p = it.next();
+                       args[it.previousIndex() + 1] = p;
+               }
+               return args;
+       }
+
+       private static File findStreamTestFile(String name) throws Exception {
+               if (new File(name).exists()) {
+                       return new File(name);
+               }
+               FileSystem fs = FileSystem.getLocalFileSystem();
+               String workingDir = fs.getWorkingDirectory().getPath();
+               if (!workingDir.endsWith(flinkPythonRltvPath)) {
+                       workingDir += File.separator + flinkPythonRltvPath;
+               }
+               FileStatus[] status = fs.listStatus(
+                       new Path( workingDir + File.separator + 
pathToStreamingTests));
+               for (FileStatus f : status) {
+                       String file_name = f.getPath().getName();
+                       if (file_name.equals(name)) {
+                               return new File(f.getPath().getPath());
+                       }
+               }
+               throw new FileNotFoundException();
+       }
+
+       private static List<String> getFilesInFolder(String path) {
+               List<String> results = new ArrayList<>();
+               File[] files = new File(path).listFiles();
+               if (files != null) {
+                       for (File file : files) {
+                               if (file.isDirectory() || 
file.getName().startsWith("test_")) {
+                                       results.add("." + File.separator + 
file.getName());
+                               }
+                       }
+               }
+               return results;
+       }
+}

Reply via email to