http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
index 6710bf1..0684e35 100644
--- 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
@@ -15,14 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.python.api.environment;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.filecache.FileCache;
@@ -30,30 +31,37 @@ import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.python.PythonOptions;
 import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
 import org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
 import org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
-import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+import org.apache.flink.streaming.python.util.AdapterMap;
+import 
org.apache.flink.streaming.python.util.serialization.PyBooleanSerializer;
+import org.apache.flink.streaming.python.util.serialization.PyFloatSerializer;
+import 
org.apache.flink.streaming.python.util.serialization.PyIntegerSerializer;
+import org.apache.flink.streaming.python.util.serialization.PyLongSerializer;
 import org.apache.flink.streaming.python.util.serialization.PyObjectSerializer;
-import org.python.core.PyObject;
-import org.python.core.PyString;
+import org.apache.flink.streaming.python.util.serialization.PyStringSerializer;
+
+import org.python.core.PyBoolean;
+import org.python.core.PyFloat;
+import org.python.core.PyInstance;
 import org.python.core.PyInteger;
 import org.python.core.PyLong;
-import org.python.core.PyUnicode;
-import org.python.core.PyTuple;
+import org.python.core.PyObject;
 import org.python.core.PyObjectDerived;
-import org.python.core.PyInstance;
+import org.python.core.PyString;
+import org.python.core.PyTuple;
+import org.python.core.PyUnicode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.file.Paths;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
-import java.util.Random;
-
+import java.util.Map;
+import java.util.UUID;
 
 /**
  * A thin wrapper layer over {@link StreamExecutionEnvironment}.
@@ -69,155 +77,70 @@ import java.util.Random;
 public class PythonStreamExecutionEnvironment {
        private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
        private final StreamExecutionEnvironment env;
+       private final Path pythonTmpCachePath;
+       private final Path tmpDistributedDir;
 
-       /**
-        * A thin wrapper layer over {@link 
StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes
-        * care for required Jython serializers registration.
-        *
-        * @return The python execution environment of the context in which the 
program is
-        * executed.
-        */
-       public static PythonStreamExecutionEnvironment 
get_execution_environment() {
-               return new 
PythonStreamExecutionEnvironment(StreamExecutionEnvironment.getExecutionEnvironment());
+       PythonStreamExecutionEnvironment(StreamExecutionEnvironment env, Path 
tmpLocalDir, Path tmpDistributedDir, String scriptName) {
+               this.env = env;
+               this.pythonTmpCachePath = tmpLocalDir;
+               this.tmpDistributedDir = tmpDistributedDir;
+               env.getConfig().setGlobalJobParameters(new 
PythonJobParameters(scriptName));
+               registerJythonSerializers(this.env);
        }
 
        /**
-        * Creates a {@link LocalStreamEnvironment}. The local execution 
environment
-        * will run the program in a multi-threaded fashion in the same JVM as 
the
-        * environment was created in. The default parallelism of the local
-        * environment is the number of hardware contexts (CPU cores / threads),
-        * unless it was specified differently by {@link #setParallelism(int)}.
-        *
-        * @param configuration
-        *              Pass a custom configuration into the cluster
-        * @return A local execution environment with the specified parallelism.
+        * Utility class for storing/retrieving parameters in/from a {@link 
ExecutionConfig.GlobalJobParameters}.
         */
-       public static PythonStreamExecutionEnvironment 
create_local_execution_environment(Configuration config) {
-               return new PythonStreamExecutionEnvironment(new 
LocalStreamEnvironment(config));
-       }
+       public static class PythonJobParameters extends 
ExecutionConfig.GlobalJobParameters {
+               private static final String KEY_SCRIPT_NAME = "scriptName";
+               private Map<String, String> parameters = new HashMap<>();
 
-       /**
-        * A thin wrapper layer over {@link 
StreamExecutionEnvironment#createLocalEnvironment(int, Configuration)}
-        *
-        * @param parallelism
-        *              The parallelism for the local environment.
-        * @param config
-        *              Pass a custom configuration into the cluster
-        * @return A local python execution environment with the specified 
parallelism.
-        */
-       public static PythonStreamExecutionEnvironment 
create_local_execution_environment(int parallelism, Configuration config) {
-               return new PythonStreamExecutionEnvironment(
-                                               
StreamExecutionEnvironment.createLocalEnvironment(parallelism, config));
-       }
+               PythonJobParameters(String scriptName) {
+                       parameters.put(KEY_SCRIPT_NAME, scriptName);
+               }
 
-       /**
-        * A thin wrapper layer over {@link 
StreamExecutionEnvironment#createRemoteEnvironment(java.lang.String, int, 
java.lang.String...)}
-        *
-        * @param host
-        *              The host name or address of the master (JobManager), 
where the
-        *              program should be executed.
-        * @param port
-        *              The port of the master (JobManager), where the program 
should
-        *              be executed.
-        * @param jar_files
-        *              The JAR files with code that needs to be shipped to the
-        *              cluster. If the program uses user-defined functions,
-        *              user-defined input formats, or any libraries, those 
must be
-        *              provided in the JAR files.
-        * @return A remote environment that executes the program on a cluster.
-        */
-       public static PythonStreamExecutionEnvironment 
create_remote_execution_environment(
-               String host, int port, String... jar_files) {
-               return new PythonStreamExecutionEnvironment(
-                                               
StreamExecutionEnvironment.createRemoteEnvironment(host, port, jar_files));
-       }
+               public Map<String, String> toMap() {
+                       return parameters;
+               }
 
-       /**
-        * A thin wrapper layer over {@link 
StreamExecutionEnvironment#createRemoteEnvironment(
-        * java.lang.String, int, Configuration, java.lang.String...)}
-        *
-        * @param host
-        *              The host name or address of the master (JobManager), 
where the
-        *              program should be executed.
-        * @param port
-        *              The port of the master (JobManager), where the program 
should
-        *              be executed.
-        * @param config
-        *              The configuration used by the client that connects to 
the remote cluster.
-        * @param jar_files
-        *              The JAR files with code that needs to be shipped to the
-        *              cluster. If the program uses user-defined functions,
-        *              user-defined input formats, or any libraries, those 
must be
-        *              provided in the JAR files.
-        * @return A remote environment that executes the program on a cluster.
-        *
-        */
-       public static PythonStreamExecutionEnvironment 
create_remote_execution_environment(
-               String host, int port, Configuration config, String... 
jar_files) {
-               return new PythonStreamExecutionEnvironment(
-                                               
StreamExecutionEnvironment.createRemoteEnvironment(host, port, config, 
jar_files));
+               public static String 
getScriptName(ExecutionConfig.GlobalJobParameters parameters) {
+                       return parameters.toMap().get(KEY_SCRIPT_NAME);
+               }
        }
 
-       /**
-        * A thin wrapper layer over {@link 
StreamExecutionEnvironment#createRemoteEnvironment(
-        * java.lang.String, int, int, java.lang.String...)}
-        *
-        * @param host
-        *              The host name or address of the master (JobManager), 
where the
-        *              program should be executed.
-        * @param port
-        *              The port of the master (JobManager), where the program 
should
-        *              be executed.
-        * @param parallelism
-        *              The parallelism to use during the execution.
-        * @param jar_files
-        *              The JAR files with code that needs to be shipped to the
-        *              cluster. If the program uses user-defined functions,
-        *              user-defined input formats, or any libraries, those 
must be
-        *              provided in the JAR files.
-        * @return A remote environment that executes the program on a cluster.
-        */
-       public static PythonStreamExecutionEnvironment 
create_remote_execution_environment(
-               String host, int port, int parallelism, String... jar_files) {
-               return new PythonStreamExecutionEnvironment(
-                                               
StreamExecutionEnvironment.createRemoteEnvironment(host, port, parallelism, 
jar_files));
-       }
+       private static void 
registerJythonSerializers(StreamExecutionEnvironment env) {
+               env.registerTypeWithKryoSerializer(PyBoolean.class, 
PyBooleanSerializer.class);
+               env.registerTypeWithKryoSerializer(PyFloat.class, 
PyFloatSerializer.class);
+               env.registerTypeWithKryoSerializer(PyInteger.class, 
PyIntegerSerializer.class);
+               env.registerTypeWithKryoSerializer(PyLong.class, 
PyLongSerializer.class);
 
-       private PythonStreamExecutionEnvironment(StreamExecutionEnvironment 
env) {
-               this.env = env;
-               this.registerJythonSerializers();
-       }
+               env.registerTypeWithKryoSerializer(PyString.class, 
PyStringSerializer.class);
+               env.registerTypeWithKryoSerializer(PyUnicode.class, 
PyObjectSerializer.class);
 
-       private void registerJythonSerializers() {
-               this.env.registerTypeWithKryoSerializer(PyString.class, 
PyObjectSerializer.class);
-               this.env.registerTypeWithKryoSerializer(PyInteger.class, 
PyObjectSerializer.class);
-               this.env.registerTypeWithKryoSerializer(PyLong.class, 
PyObjectSerializer.class);
-               this.env.registerTypeWithKryoSerializer(PyUnicode.class, 
PyObjectSerializer.class);
-               this.env.registerTypeWithKryoSerializer(PyTuple.class, 
PyObjectSerializer.class);
-               this.env.registerTypeWithKryoSerializer(PyObjectDerived.class, 
PyObjectSerializer.class);
-               this.env.registerTypeWithKryoSerializer(PyInstance.class, 
PyObjectSerializer.class);
+               env.registerTypeWithKryoSerializer(PyTuple.class, 
PyObjectSerializer.class);
+               env.registerTypeWithKryoSerializer(PyObjectDerived.class, 
PyObjectSerializer.class);
+               env.registerTypeWithKryoSerializer(PyInstance.class, 
PyObjectSerializer.class);
        }
 
        public PythonDataStream create_python_source(SourceFunction<Object> 
src) throws Exception {
-               return new PythonDataStream<>(env.addSource(new 
PythonGeneratorFunction(src)).map(new UtilityFunctions.SerializerMap<>()));
+               return new PythonDataStream<>(env.addSource(new 
PythonGeneratorFunction(src)).map(new AdapterMap<>()));
        }
 
        /**
         * Add a java source to the streaming topology. The source expected to 
be an java based
         * implementation (.e.g. Kafka connector).
         *
-        * @param src  A native java source (e.g. PythonFlinkKafkaConsumer09)
+        * @param src A native java source (e.g. PythonFlinkKafkaConsumer09)
         * @return Python data stream
         */
        public PythonDataStream add_java_source(SourceFunction<Object> src) {
-               return new PythonDataStream<>(env.addSource(src).map(new 
UtilityFunctions.SerializerMap<>()));
+               return new PythonDataStream<>(env.addSource(src).map(new 
AdapterMap<>()));
        }
 
        /**
-        * A thin wrapper layer over {@link 
StreamExecutionEnvironment#fromElements(java.lang.Object[])}
+        * A thin wrapper layer over {@link 
StreamExecutionEnvironment#fromElements(java.lang.Object[])}.
         *
-        * @param elements
-        *              The array of PyObject elements to create the data 
stream from.
+        * @param elements The array of PyObject elements to create the data 
stream from.
         * @return The data stream representing the given array of elements
         */
        public PythonDataStream from_elements(PyObject... elements) {
@@ -230,13 +153,11 @@ public class PythonStreamExecutionEnvironment {
         * <p>The input {@code Collection} is of type {@code Object}, because 
it is a collection
         * of Python elements. * There type is determined in runtime, by the 
Jython framework.</p>
         *
-        * @param collection
-        *              The collection of python elements to create the data 
stream from.
-        * @return
-        *     The data stream representing the given collection
+        * @param collection The collection of python elements to create the 
data stream from.
+        * @return The data stream representing the given collection
         */
        public PythonDataStream from_collection(Collection<Object> collection) {
-               return new 
PythonDataStream<>(env.fromCollection(collection).map(new 
UtilityFunctions.SerializerMap<>()));
+               return new 
PythonDataStream<>(env.fromCollection(collection).map(new AdapterMap<>()));
        }
 
        /**
@@ -245,54 +166,48 @@ public class PythonStreamExecutionEnvironment {
         * <p>Note that this operation will result in a non-parallel data 
stream source, i.e.,
         * a data stream source with a parallelism of one.</p>
         *
-        * @param iter
-        *              The iterator of elements to create the data stream from
+        * @param iter The iterator of elements to create the data stream from
         * @return The data stream representing the elements in the iterator
         * @see StreamExecutionEnvironment#fromCollection(java.util.Iterator, 
org.apache.flink.api.common.typeinfo.TypeInformation)
         */
-       public PythonDataStream from_collection(Iterator<Object> iter) throws 
Exception  {
+       public PythonDataStream from_collection(Iterator<Object> iter) throws 
Exception {
                return new PythonDataStream<>(env.addSource(new 
PythonIteratorFunction(iter), TypeExtractor.getForClass(Object.class))
-                       .map(new UtilityFunctions.SerializerMap<>()));
+                       .map(new AdapterMap<>()));
        }
 
        /**
         * A thin wrapper layer over {@link 
StreamExecutionEnvironment#generateSequence(long, long)}.
         *
-        * @param from
-        *              The number to start at (inclusive)
-        * @param to
-        *              The number to stop at (inclusive)
+        * @param from The number to start at (inclusive)
+        * @param to The number to stop at (inclusive)
         * @return A python data stream, containing all number in the [from, 
to] interval
         */
        public PythonDataStream generate_sequence(long from, long to) {
-               return new PythonDataStream<>(env.generateSequence(from, 
to).map(new UtilityFunctions.SerializerMap<Long>()));
+               return new PythonDataStream<>(env.generateSequence(from, 
to).map(new AdapterMap<>()));
        }
 
        /**
         * A thin wrapper layer over {@link 
StreamExecutionEnvironment#readTextFile(java.lang.String)}.
         *
-        * @param path
-        *              The path of the file, as a URI (e.g., 
"file:///some/local/file" or "hdfs://host:port/file/path").
+        * @param path The path of the file, as a URI (e.g., 
"file:///some/local/file" or "hdfs://host:port/file/path").
         * @return The data stream that represents the data read from the given 
file as text lines
         * @throws IOException
         */
 
        public PythonDataStream read_text_file(String path) throws IOException {
-               return new PythonDataStream<>(env.readTextFile(path).map(new 
UtilityFunctions.SerializerMap<String>()));
+               return new PythonDataStream<>(env.readTextFile(path).map(new 
AdapterMap<String>()));
        }
 
        /**
         * A thin wrapper layer over {@link 
StreamExecutionEnvironment#socketTextStream(java.lang.String, int)}.
         *
-        * @param host
-        *              The host name which a server socket binds
-        * @param port
-        *              The port number which a server socket binds. A port 
number of 0 means that the port number is automatically
-        *              allocated.
+        * @param host The host name which a server socket binds
+        * @param port The port number which a server socket binds. A port 
number of 0 means that the port number is automatically
+        * allocated.
         * @return A python data stream containing the strings received from 
the socket
         */
        public PythonDataStream socket_text_stream(String host, int port) {
-               return new PythonDataStream<>(env.socketTextStream(host, 
port).map(new UtilityFunctions.SerializerMap<String>()));
+               return new PythonDataStream<>(env.socketTextStream(host, 
port).map(new AdapterMap<String>()));
        }
 
        /**
@@ -310,8 +225,7 @@ public class PythonStreamExecutionEnvironment {
         * A thin wrapper layer over {@link 
StreamExecutionEnvironment#enableCheckpointing(long, CheckpointingMode)}.
         *
         * @param interval Time interval between state checkpoints in 
milliseconds.
-        * @param mode
-        *             The checkpointing mode, selecting between "exactly once" 
and "at least once" guaranteed.
+        * @param mode The checkpointing mode, selecting between "exactly once" 
and "at least once" guaranteed.
         * @return The same {@code PythonStreamExecutionEnvironment} instance 
of the caller
         */
        public PythonStreamExecutionEnvironment enable_checkpointing(long 
interval, CheckpointingMode mode) {
@@ -336,24 +250,7 @@ public class PythonStreamExecutionEnvironment {
         * @return The result of the job execution
         */
        public JobExecutionResult execute() throws Exception {
-               return execute(false);
-       }
-
-       /**
-        * A thin wrapper layer over {@link 
StreamExecutionEnvironment#execute()}.
-        *
-        * <p>In addition, it enables the caller to provide a hint about the 
execution mode - whether it is local
-        * or remote. In the case of local execution, the relevant cached files 
are distributed using the
-        * local machine temporary folder, otherwise a shared storage medium is 
used for this purpose.</p>
-        *
-        * @return The result of the job execution
-        */
-       public JobExecutionResult execute(Boolean local) throws Exception {
-               if (PythonEnvironmentConfig.pythonTmpCachePath == null) {
-                       // Nothing to be done! Is is executed on the task 
manager.
-                       return new JobExecutionResult(null, 0, null);
-               }
-               distributeFiles(local);
+               distributeFiles();
                JobExecutionResult result = this.env.execute();
                cleanupDistributedFiles();
                return result;
@@ -366,51 +263,30 @@ public class PythonStreamExecutionEnvironment {
         * @throws Exception which occurs during job execution.
         */
        public JobExecutionResult execute(String job_name) throws Exception {
-               return execute(job_name, false);
-       }
-
-       /**
-        * A thin wrapper layer over {@link 
StreamExecutionEnvironment#execute(java.lang.String).
-        *
-        * <p>In addition, it enables the caller to provide a hint about the 
execution mode - whether it is local
-        * or remote. In the case of local execution, the relevant cached files 
are distributed using the
-        * local machine temporary folder, otherwise a shared storage medium is 
used for this purpose.</p>
-        *
-        * @return The result of the job execution, containing elapsed time and 
accumulators.
-        * @throws Exception which occurs during job execution.
-        */
-       public JobExecutionResult execute(String job_name, Boolean local) 
throws Exception {
-               if (PythonEnvironmentConfig.pythonTmpCachePath == null) {
-                       // Nothing to be done! Is is executed on the task 
manager.
-                       return new JobExecutionResult(null, 0, null);
-               }
-               distributeFiles(local);
+               distributeFiles();
                JobExecutionResult result = this.env.execute(job_name);
                cleanupDistributedFiles();
                return result;
        }
 
-       private void distributeFiles(boolean local) throws IOException, 
URISyntaxException
-       {
-               String rootDir;
-               if (local || this.env instanceof LocalStreamEnvironment) {
-                       rootDir = System.getProperty("java.io.tmpdir");
+       private void distributeFiles() throws IOException {
+               Path remoteRootDir;
+               if (this.env instanceof LocalStreamEnvironment) {
+                       remoteRootDir = new 
Path(PythonOptions.DC_TMP_DIR.defaultValue());
                } else {
-                       rootDir = "hdfs:///tmp";
+                       remoteRootDir = tmpDistributedDir;
                }
-               PythonEnvironmentConfig.FLINK_HDFS_PATH = Paths.get(rootDir, 
"flink_cache_" +
-                       (new 
Random(System.currentTimeMillis())).nextLong()).toString();
+               Path remoteDir = new Path(remoteRootDir, "flink_cache_" + 
UUID.randomUUID());
 
-               FileCache.copy(new 
Path(PythonEnvironmentConfig.pythonTmpCachePath),
-                       new Path(PythonEnvironmentConfig.FLINK_HDFS_PATH), 
true);
+               FileCache.copy(pythonTmpCachePath, remoteDir, true);
 
-               
this.env.registerCachedFile(PythonEnvironmentConfig.FLINK_HDFS_PATH, 
PythonEnvironmentConfig.FLINK_PYTHON_DC_ID);
+               this.env.registerCachedFile(remoteDir.toUri().toString(), 
PythonConstants.FLINK_PYTHON_DC_ID);
        }
 
-       private void cleanupDistributedFiles() throws IOException, 
URISyntaxException {
+       private void cleanupDistributedFiles() throws IOException {
                for (Tuple2<String, DistributedCache.DistributedCacheEntry> e : 
this.env.getCachedFiles()) {
-                       URI fileUri = new URI(e.f1.filePath);
-                       FileSystem fs = FileSystem.get(fileUri);
+                       Path fileUri = new Path(e.f1.filePath);
+                       FileSystem fs = fileUri.getFileSystem();
                        LOG.debug(String.format("Cleaning up cached path: %s, 
uriPath: %s, fileSystem: %s",
                                e.f1.filePath,
                                fileUri.getPath(),

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/AbstractPythonUDF.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/AbstractPythonUDF.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/AbstractPythonUDF.java
new file mode 100644
index 0000000..ca8ea04
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/AbstractPythonUDF.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.python.api.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.python.util.InterpreterUtils;
+import org.apache.flink.streaming.python.util.serialization.SerializationUtils;
+import org.apache.flink.util.FlinkException;
+
+import org.python.core.PyException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+/**
+ * Generic base-class for wrappers of python functions implenting the {@link 
Function} interface.
+ */
+public class AbstractPythonUDF<F extends Function> extends 
AbstractRichFunction {
+       protected Logger log = LoggerFactory.getLogger(AbstractPythonUDF.class);
+       private final byte[] serFun;
+       protected transient F fun;
+
+       AbstractPythonUDF(F fun) throws IOException {
+               this.serFun = SerializationUtils.serializeObject(fun);
+       }
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               this.fun = 
InterpreterUtils.deserializeFunction(getRuntimeContext(), this.serFun);
+               if (this.fun instanceof RichFunction) {
+                       try {
+                               final RichFunction rf = (RichFunction) this.fun;
+                               rf.setRuntimeContext(getRuntimeContext());
+                               rf.open(parameters);
+                       } catch (PyException pe) {
+                               throw createAndLogException(pe);
+                       }
+               }
+       }
+
+       @Override
+       public void close() throws Exception {
+               if (this.fun instanceof RichFunction) {
+                       try {
+                               ((RichFunction) this.fun).close();
+                       } catch (PyException pe) {
+                               throw createAndLogException(pe);
+                       }
+               }
+       }
+
+       FlinkException createAndLogException(PyException pe) {
+               return createAndLogException(pe, log);
+       }
+
+       static FlinkException createAndLogException(PyException pe, Logger log) 
{
+               StringWriter sw = new StringWriter();
+               try (PrintWriter pw = new PrintWriter(sw)) {
+                       pe.printStackTrace(pw);
+               }
+               String pythonStackTrace = sw.toString().trim();
+
+               log.error("Python function failed: " + System.lineSeparator() + 
pythonStackTrace);
+               return new FlinkException("Python function failed: " + 
pythonStackTrace);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PyKey.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PyKey.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PyKey.java
index 52ac4ff..964ef2c 100644
--- 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PyKey.java
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PyKey.java
@@ -15,8 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.python.api.functions;
 
+package org.apache.flink.streaming.python.api.functions;
 
 /**
  * A key type used by {@link 
org.apache.flink.streaming.python.api.functions.PythonKeySelector} to
@@ -30,7 +30,6 @@ public class PyKey {
                this.data = data;
        }
 
-
        public Object getData() {
                return data;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonApplyFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonApplyFunction.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonApplyFunction.java
index 8fe3245..bf883b0 100644
--- 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonApplyFunction.java
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonApplyFunction.java
@@ -15,15 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.python.api.functions;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.python.util.PythonCollector;
-import org.apache.flink.streaming.python.util.serialization.SerializationUtils;
 import org.apache.flink.util.Collector;
+
+import org.python.core.PyException;
 import org.python.core.PyObject;
 
 import java.io.IOException;
@@ -37,41 +38,28 @@ import java.io.IOException;
  * <p>This function is used internally by the Python thin wrapper layer over 
the streaming data
  * functionality</p>
  */
-public class PythonApplyFunction<W extends Window> extends 
RichWindowFunction<PyObject, PyObject, PyKey, W> {
+public class PythonApplyFunction<W extends Window> extends 
AbstractPythonUDF<WindowFunction<PyObject, Object, Object, W>> implements 
WindowFunction<PyObject, PyObject, PyKey, W> {
        private static final long serialVersionUID = 577032239468987781L;
 
-       private final byte[] serFun;
-       private transient WindowFunction<PyObject, PyObject, Object, Window> 
fun;
        private transient PythonCollector collector;
 
-       public PythonApplyFunction(WindowFunction<PyObject, PyObject, Object, 
W> fun) throws IOException {
-               this.serFun = SerializationUtils.serializeObject(fun);
+       public PythonApplyFunction(WindowFunction<PyObject, Object, Object, W> 
fun) throws IOException {
+               super(fun);
        }
 
        @Override
-       @SuppressWarnings("unchecked")
-       public void open(Configuration parameters) throws Exception {
-               this.fun =
-                       (WindowFunction<PyObject, PyObject, Object, Window>) 
UtilityFunctions.smartFunctionDeserialization(
-                       getRuntimeContext(), this.serFun);
-               if (this.fun instanceof RichWindowFunction) {
-                       final RichWindowFunction winFun = 
(RichWindowFunction)this.fun;
-                       winFun.setRuntimeContext(getRuntimeContext());
-                       winFun.open(parameters);
-               }
+       public void open(Configuration config) throws Exception {
+               super.open(config);
                this.collector = new PythonCollector();
        }
 
        @Override
-       public void close() throws Exception {
-               if (this.fun instanceof RichWindowFunction) {
-                       ((RichWindowFunction)this.fun).close();
-               }
-       }
-
-       @Override
        public void apply(PyKey key, W window, Iterable<PyObject> values, 
Collector<PyObject> out) throws Exception {
                this.collector.setCollector(out);
-               this.fun.apply(key.getData(), window, values, this.collector);
+               try {
+                       this.fun.apply(key.getData(), window, values, 
this.collector);
+               } catch (PyException pe) {
+                       throw createAndLogException(pe);
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFilterFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFilterFunction.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFilterFunction.java
index 345fdcc..fab7ceb 100644
--- 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFilterFunction.java
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFilterFunction.java
@@ -15,18 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.python.api.functions;
 
 import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.RichFilterFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.python.util.serialization.SerializationUtils;
+
+import org.python.core.PyException;
 import org.python.core.PyObject;
 
 import java.io.IOException;
 
-
 /**
  * The {@code PythonFilterFunction} is a thin wrapper layer over a Python UDF 
{@code FilterFunction}.
  * It receives a {@code FilterFunction} as an input and keeps it internally in 
a serialized form.
@@ -36,37 +34,19 @@ import java.io.IOException;
  * <p>This function is used internally by the Python thin wrapper layer over 
the streaming data
  * functionality</p>
  */
-public class PythonFilterFunction extends RichFilterFunction<PyObject> {
+public class PythonFilterFunction extends 
AbstractPythonUDF<FilterFunction<PyObject>> implements FilterFunction<PyObject> 
{
        private static final long serialVersionUID = 775688642701399472L;
 
-       private final byte[] serFun;
-       private transient FilterFunction<PyObject> fun;
-
        public PythonFilterFunction(FilterFunction<PyObject> fun) throws 
IOException {
-               this.serFun = SerializationUtils.serializeObject(fun);
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       public void open(Configuration parameters) throws Exception {
-               this.fun = (FilterFunction<PyObject>) 
UtilityFunctions.smartFunctionDeserialization(
-                       getRuntimeContext(), this.serFun);
-               if (this.fun instanceof RichFunction) {
-                       final RichFilterFunction filterFun = 
(RichFilterFunction)this.fun;
-                       filterFun.setRuntimeContext(getRuntimeContext());
-                       filterFun.open(parameters);
-               }
-       }
-
-       @Override
-       public void close() throws Exception {
-               if (this.fun instanceof RichFunction) {
-                       ((RichFilterFunction)this.fun).close();
-               }
+               super(fun);
        }
 
        @Override
        public boolean filter(PyObject value) throws Exception {
-               return this.fun.filter(value);
+               try {
+                       return this.fun.filter(value);
+               } catch (PyException pe) {
+                       throw createAndLogException(pe);
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFlatMapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFlatMapFunction.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFlatMapFunction.java
index 21554c7..a768bff 100644
--- 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFlatMapFunction.java
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFlatMapFunction.java
@@ -15,20 +15,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.python.api.functions;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.python.util.PythonCollector;
-import org.apache.flink.streaming.python.util.serialization.SerializationUtils;
 import org.apache.flink.util.Collector;
+
+import org.python.core.PyException;
 import org.python.core.PyObject;
 
 import java.io.IOException;
 
-
 /**
  * The {@code PythonFlatMapFunction} is a thin wrapper layer over a Python UDF 
{@code FlatMapFunction}.
  * It receives a {@code FlatMapFunction} as an input and keeps it internally 
in a serialized form.
@@ -38,41 +37,28 @@ import java.io.IOException;
  * <p>This function is used internally by the Python thin wrapper layer over 
the streaming data
  * functionality</p>
  */
-public class PythonFlatMapFunction extends RichFlatMapFunction<PyObject, 
PyObject> {
+public class PythonFlatMapFunction extends 
AbstractPythonUDF<FlatMapFunction<PyObject, Object>> implements 
FlatMapFunction<PyObject, PyObject> {
        private static final long serialVersionUID = -6098432222172956477L;
 
-       private final byte[] serFun;
-       private transient FlatMapFunction<PyObject, PyObject> fun;
        private transient PythonCollector collector;
 
-       public PythonFlatMapFunction(FlatMapFunction<PyObject, PyObject> fun) 
throws IOException {
-               this.serFun = SerializationUtils.serializeObject(fun);
+       public PythonFlatMapFunction(FlatMapFunction<PyObject, Object> fun) 
throws IOException {
+               super(fun);
        }
 
        @Override
-       @SuppressWarnings("unchecked")
        public void open(Configuration config) throws Exception {
-               this.fun =
-                       (FlatMapFunction<PyObject, PyObject>) 
UtilityFunctions.smartFunctionDeserialization(
-                               getRuntimeContext(), serFun);
-               if (this.fun instanceof RichFunction) {
-                       final RichFlatMapFunction flatMapFun = 
(RichFlatMapFunction)this.fun;
-                       flatMapFun.setRuntimeContext(getRuntimeContext());
-                       flatMapFun.open(config);
-               }
+               super.open(config);
                this.collector = new PythonCollector();
        }
 
        @Override
-       public void close() throws Exception {
-               if (this.fun instanceof RichFunction) {
-                       ((RichFlatMapFunction)this.fun).close();
-               }
-       }
-
-       @Override
        public void flatMap(PyObject value, Collector<PyObject> out) throws 
Exception {
                this.collector.setCollector(out);
-               this.fun.flatMap(value, this.collector);
+               try {
+                       this.fun.flatMap(value, this.collector);
+               } catch (PyException pe) {
+                       throw createAndLogException(pe);
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonGeneratorFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonGeneratorFunction.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonGeneratorFunction.java
index 70f212e..196edb5 100644
--- 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonGeneratorFunction.java
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonGeneratorFunction.java
@@ -15,12 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.python.api.functions;
 
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.python.util.serialization.SerializationUtils;
+
+import org.python.core.PyException;
 
 import java.io.IOException;
 
@@ -33,33 +33,28 @@ import java.io.IOException;
  * <p>This function is used internally by the Python thin wrapper layer over 
the streaming data
  * functionality</p>
  */
-public class PythonGeneratorFunction extends RichSourceFunction<Object> {
+public class PythonGeneratorFunction extends 
AbstractPythonUDF<SourceFunction<Object>> implements SourceFunction<Object> {
        private static final long serialVersionUID = 3854587935845323082L;
 
-       private final byte[] serFun;
-       private transient SourceFunction<Object> fun;
-
        public PythonGeneratorFunction(SourceFunction<Object> fun) throws 
IOException {
-               this.serFun = SerializationUtils.serializeObject(fun);
+               super(fun);
        }
 
-       @Override
-       @SuppressWarnings("unchecked")
-       public void open(Configuration parameters) throws Exception {
-               this.fun = (SourceFunction<Object>) 
UtilityFunctions.smartFunctionDeserialization(
-                       getRuntimeContext(), this.serFun);
-       }
-
-       @Override
-       public void close() throws Exception {}
-
        public void run(SourceContext<Object> ctx) throws Exception {
-               this.fun.run(ctx);
+               try {
+                       this.fun.run(ctx);
+               } catch (PyException pe) {
+                       throw createAndLogException(pe);
+               }
        }
 
        public void cancel() {
                if (this.fun != null) {
-                       this.fun.cancel();
+                       try {
+                               this.fun.cancel();
+                       } catch (PyException pe) {
+                               createAndLogException(pe);
+                       }
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonIteratorFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonIteratorFunction.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonIteratorFunction.java
index 38bf175..709fe19 100644
--- 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonIteratorFunction.java
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonIteratorFunction.java
@@ -15,14 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.python.api.functions;
 
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.python.util.InterpreterUtils;
 import org.apache.flink.streaming.python.util.serialization.SerializationUtils;
 
-import java.util.Iterator;
+import org.python.core.PyException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
+import java.util.Iterator;
 
 /**
  * The {@code PythonIteratorFunction} is a thin wrapper layer over a Python 
UDF {@code Iterator}.
@@ -35,6 +42,7 @@ import java.io.IOException;
  */
 public class PythonIteratorFunction extends RichSourceFunction<Object> {
        private static final long serialVersionUID = 6741748297048588334L;
+       private static final Logger LOG = 
LoggerFactory.getLogger(PythonIteratorFunction.class);
 
        private final byte[] serFun;
        private transient Iterator<Object> fun;
@@ -45,20 +53,38 @@ public class PythonIteratorFunction extends 
RichSourceFunction<Object> {
        }
 
        @Override
-       @SuppressWarnings("unchecked")
        public void open(Configuration parameters) throws Exception {
-               this.isRunning = true;
-               this.fun = (Iterator<Object>) 
UtilityFunctions.smartFunctionDeserialization(getRuntimeContext(), this.serFun);
+               this.fun = 
InterpreterUtils.deserializeFunction(getRuntimeContext(), this.serFun);
+               if (this.fun instanceof RichFunction) {
+                       try {
+                               final RichFunction winFun = (RichFunction) 
this.fun;
+                               winFun.setRuntimeContext(getRuntimeContext());
+                               winFun.open(parameters);
+                       } catch (PyException pe) {
+                               throw 
AbstractPythonUDF.createAndLogException(pe, LOG);
+                       }
+               }
        }
 
        @Override
-       public void close() throws Exception {
+       public void run(SourceContext<Object> ctx) throws Exception {
+               try {
+                       while (isRunning && this.fun.hasNext()) {
+                               ctx.collect(this.fun.next());
+                       }
+               } catch (PyException pe) {
+                       throw AbstractPythonUDF.createAndLogException(pe, LOG);
+               }
        }
 
        @Override
-       public void run(SourceContext<Object> ctx) throws Exception {
-               while (isRunning && this.fun.hasNext()) {
-                       ctx.collect(this.fun.next());
+       public void close() throws Exception {
+               if (this.fun instanceof RichFunction) {
+                       try {
+                               ((RichFunction) this.fun).close();
+                       } catch (PyException pe) {
+                               throw 
AbstractPythonUDF.createAndLogException(pe, LOG);
+                       }
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonKeySelector.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonKeySelector.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonKeySelector.java
index 7cdaacb..03a49be 100644
--- 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonKeySelector.java
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonKeySelector.java
@@ -15,11 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.python.api.functions;
 
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.python.util.serialization.SerializationUtils;
+
+import org.python.core.PyException;
 import org.python.core.PyObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
@@ -34,6 +39,8 @@ import java.io.IOException;
  */
 public class PythonKeySelector implements KeySelector<PyObject, PyKey> {
        private static final long serialVersionUID = 7403775239671366607L;
+       private static final Logger LOG = 
LoggerFactory.getLogger(PythonKeySelector.class);
+
        private final byte[] serFun;
        private transient KeySelector<PyObject, Object> fun;
 
@@ -45,9 +52,13 @@ public class PythonKeySelector implements 
KeySelector<PyObject, PyKey> {
        @SuppressWarnings("unchecked")
        public PyKey getKey(PyObject value) throws Exception {
                if (fun == null) {
-                       fun = (KeySelector<PyObject, Object>) 
SerializationUtils.deserializeObject(serFun);
+                       fun = SerializationUtils.deserializeObject(serFun);
+               }
+               try {
+                       Object key = fun.getKey(value);
+                       return new PyKey(key);
+               } catch (PyException pe) {
+                       throw AbstractPythonUDF.createAndLogException(pe, LOG);
                }
-               Object key = fun.getKey(value);
-               return new PyKey(key);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonMapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonMapFunction.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonMapFunction.java
index 24d19da..bef3fcb 100644
--- 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonMapFunction.java
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonMapFunction.java
@@ -15,13 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.python.api.functions;
 
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.python.util.serialization.SerializationUtils;
+import org.apache.flink.streaming.python.util.AdapterMap;
+
+import org.python.core.PyException;
 import org.python.core.PyObject;
 
 import java.io.IOException;
@@ -35,36 +35,19 @@ import java.io.IOException;
  * <p>This function is used internally by the Python thin wrapper layer over 
the streaming data
  * functionality</p>
  */
-public class PythonMapFunction extends RichMapFunction<PyObject, PyObject> {
+public class PythonMapFunction extends AbstractPythonUDF<MapFunction<PyObject, 
PyObject>> implements MapFunction<PyObject, PyObject> {
        private static final long serialVersionUID = 3001212087036451818L;
-       private final byte[] serFun;
-       private transient MapFunction<PyObject, PyObject> fun;
 
        public PythonMapFunction(MapFunction<PyObject, PyObject> fun) throws 
IOException {
-               this.serFun = SerializationUtils.serializeObject(fun);
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       public void open(Configuration config) throws Exception {
-               this.fun = (MapFunction<PyObject, PyObject>) 
UtilityFunctions.smartFunctionDeserialization(
-                       getRuntimeContext(), serFun);
-               if (this.fun instanceof RichFunction) {
-                       final RichMapFunction mapFun = 
(RichMapFunction)this.fun;
-                       mapFun.setRuntimeContext(getRuntimeContext());
-                       mapFun.open(config);
-               }
-       }
-
-       @Override
-       public void close() throws Exception {
-               if (this.fun instanceof RichFunction) {
-                       ((RichMapFunction)this.fun).close();
-               }
+               super(fun);
        }
 
        @Override
        public PyObject map(PyObject value) throws Exception {
-               return UtilityFunctions.adapt(this.fun.map(value));
+               try {
+                       return AdapterMap.adapt(fun.map(value));
+               } catch (PyException pe) {
+                       throw createAndLogException(pe);
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonOutputSelector.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonOutputSelector.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonOutputSelector.java
index f022322..3e4759e 100644
--- 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonOutputSelector.java
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonOutputSelector.java
@@ -15,11 +15,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.python.api.functions;
 
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.python.util.serialization.SerializationUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.python.core.PyException;
 import org.python.core.PyObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
@@ -34,6 +40,7 @@ import java.io.IOException;
  */
 public class PythonOutputSelector implements OutputSelector<PyObject> {
        private static final long serialVersionUID = 909266346633598177L;
+       private static final Logger LOG = 
LoggerFactory.getLogger(PythonOutputSelector.class);
 
        private final byte[] serFun;
        private transient OutputSelector<PyObject> fun;
@@ -43,20 +50,18 @@ public class PythonOutputSelector implements 
OutputSelector<PyObject> {
        }
 
        @Override
-       @SuppressWarnings("unchecked")
        public Iterable<String> select(PyObject value) {
                if (this.fun == null) {
                        try {
-                               this.fun = (OutputSelector<PyObject>) 
SerializationUtils.deserializeObject(this.serFun);
-                       } catch (IOException e) {
-                               e.printStackTrace();
-                       } catch (ClassNotFoundException e) {
-                               e.printStackTrace();
+                               fun = 
SerializationUtils.deserializeObject(serFun);
+                       } catch (Exception e) {
+                               throw new FlinkRuntimeException("Failed to 
deserialize user-defined function.", e);
                        }
                }
-               if (this.fun == null) {
-                       return null;
+               try {
+                       return this.fun.select(value);
+               } catch (PyException pe) {
+                       throw new 
FlinkRuntimeException(AbstractPythonUDF.createAndLogException(pe, LOG));
                }
-               return this.fun.select(value);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonReduceFunction.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonReduceFunction.java
index d02240c..1fa071d 100644
--- 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonReduceFunction.java
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonReduceFunction.java
@@ -15,12 +15,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.python.api.functions;
 
-import java.io.IOException;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.python.util.AdapterMap;
 import org.apache.flink.streaming.python.util.serialization.SerializationUtils;
+
+import org.python.core.PyException;
 import org.python.core.PyObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
 
 /**
  * The {@code PythonReduceFunction} is a thin wrapper layer over a Python UDF 
{@code ReduceFunction}.
@@ -33,6 +40,7 @@ import org.python.core.PyObject;
  */
 public class PythonReduceFunction implements ReduceFunction<PyObject> {
        private static final long serialVersionUID = -9070596504893036458L;
+       private static final Logger LOG = 
LoggerFactory.getLogger(PythonReduceFunction.class);
 
        private final byte[] serFun;
        private transient ReduceFunction<PyObject> fun;
@@ -42,12 +50,14 @@ public class PythonReduceFunction implements 
ReduceFunction<PyObject> {
        }
 
        @Override
-       @SuppressWarnings("unchecked")
        public PyObject reduce(PyObject value1, PyObject value2) throws 
Exception {
                if (fun == null) {
-                       fun = (ReduceFunction<PyObject>) 
SerializationUtils.deserializeObject(serFun);
+                       fun = SerializationUtils.deserializeObject(serFun);
+               }
+               try {
+                       return AdapterMap.adapt(this.fun.reduce(value1, 
value2));
+               } catch (PyException pe) {
+                       throw AbstractPythonUDF.createAndLogException(pe, LOG);
                }
-
-               return UtilityFunctions.adapt(this.fun.reduce(value1, value2));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonSinkFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonSinkFunction.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonSinkFunction.java
index e0351b0..f860e0b 100644
--- 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonSinkFunction.java
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonSinkFunction.java
@@ -15,14 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.python.api.functions;
 
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.python.util.serialization.SerializationUtils;
+
+import org.python.core.PyException;
 import org.python.core.PyObject;
 
 import java.io.IOException;
@@ -36,37 +34,20 @@ import java.io.IOException;
  * <p>This function is used internally by the Python thin wrapper layer over 
the streaming data
  * functionality</p>
  */
-public class PythonSinkFunction extends RichSinkFunction<PyObject> {
+public class PythonSinkFunction extends 
AbstractPythonUDF<SinkFunction<PyObject>> implements SinkFunction<PyObject> {
+
        private static final long serialVersionUID = -9030596504893036458L;
-       private final byte[] serFun;
-       private transient SinkFunction<PyObject> fun;
-       private transient RuntimeContext context;
 
        public PythonSinkFunction(SinkFunction<PyObject> fun) throws 
IOException {
-               this.serFun = SerializationUtils.serializeObject(fun);
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       public void open(Configuration parameters) throws Exception {
-               this.fun = (SinkFunction<PyObject>) 
UtilityFunctions.smartFunctionDeserialization(
-                       getRuntimeContext(), this.serFun);
-               if (this.fun instanceof RichFunction) {
-                       final RichSinkFunction sinkFunction = 
(RichSinkFunction)this.fun;
-                       sinkFunction.setRuntimeContext(getRuntimeContext());
-                       sinkFunction.open(parameters);
-               }
+               super(fun);
        }
 
        @Override
-       public void close() throws Exception {
-               if (this.fun instanceof RichFunction) {
-                       ((RichSinkFunction)this.fun).close();
+       public void invoke(PyObject value, Context context) throws Exception {
+               try {
+                       this.fun.invoke(value, context);
+               } catch (PyException pe) {
+                       throw createAndLogException(pe);
                }
        }
-
-       @Override
-       public void invoke(PyObject value) throws Exception {
-               this.fun.invoke(value);
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/UtilityFunctions.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/UtilityFunctions.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/UtilityFunctions.java
deleted file mode 100644
index 0417a43..0000000
--- 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/UtilityFunctions.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.python.api.functions;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import 
org.apache.flink.streaming.python.api.environment.PythonEnvironmentConfig;
-import org.apache.flink.streaming.python.util.serialization.SerializationUtils;
-import org.python.core.Py;
-import org.python.core.PyObject;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.python.core.PySystemState;
-import org.python.util.PythonInterpreter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Properties;
-
-/**
- * A collection of utility functions that are used by the python wrappers thin 
layer over
- * the streaming functions.
- */
-public class UtilityFunctions {
-       private static boolean jythonInitialized = false;
-       private static PythonInterpreter pythonInterpreter;
-       private static final Logger LOG = 
LoggerFactory.getLogger(UtilityFunctions.class);
-
-       private UtilityFunctions() {
-       }
-
-       /**
-        * A generic map operator that convert any java type to PyObject. It is 
mainly used to convert elements
-        * collected from a source functions, to PyObject objects.
-        *
-        * @param <IN> Any given java object
-        */
-       public static class SerializerMap<IN> implements MapFunction<IN, 
PyObject> {
-               private static final long serialVersionUID = 
1582769662549499373L;
-
-               @Override
-               public PyObject map(IN value) throws Exception {
-                       return UtilityFunctions.adapt(value);
-               }
-       }
-
-       /**
-        * Convert java object to its corresponding PyObject representation.
-        *
-        * @param o Java object
-        * @return PyObject
-        */
-       public static PyObject adapt(Object o) {
-               if (o instanceof PyObject) {
-                       return (PyObject)o;
-               }
-               return  Py.java2py(o);
-       }
-
-       /**
-        * Deserializes Python UDF functions in a "smart" way. It first tries 
to extract the function using a common
-        * java method to deserialize an object. If it fails, it is assumed 
that the function object definition does not
-        * exist, and therefore, a Jython interpreter is initialized and the 
relevant python script is executed (without
-        * actually executing the 'execute' function). This result from this 
operation is that all the Python UDF objects
-        * that are defined in that python script are also defined in the local 
JVM. Is is now possible to extract
-        * the function using a common java method to deserialize the java 
function.
-        *
-        * @param runtimeCtx The runtime context of the executed job.
-        * @param serFun A serialized form of the given Python UDF
-        * @return An extracted java function
-        * @throws IOException
-        * @throws ClassNotFoundException
-        * @throws InterruptedException
-        */
-       public static synchronized Object 
smartFunctionDeserialization(RuntimeContext runtimeCtx, byte[] serFun) throws 
IOException, ClassNotFoundException, InterruptedException {
-               try {
-                       return SerializationUtils.deserializeObject(serFun);
-               } catch (Exception e) {
-                       String path = 
runtimeCtx.getDistributedCache().getFile(PythonEnvironmentConfig.FLINK_PYTHON_DC_ID).getAbsolutePath();
-
-                       initPythonInterpreter(path, new String[]{""});
-
-                       PySystemState pySysStat = Py.getSystemState();
-                       pySysStat.path.add(0, path);
-
-                       String scriptFullPath = path + File.separator + 
PythonEnvironmentConfig.FLINK_PYTHON_PLAN_NAME;
-                       LOG.debug("Execute python script, path=" + 
scriptFullPath);
-                       pythonInterpreter.execfile(scriptFullPath);
-
-                       pySysStat.path.remove(path);
-
-                       return SerializationUtils.deserializeObject(serFun);
-               }
-       }
-
-       /**
-        * Initializes the Jython interpreter and executes a python script.
-        *
-        * @param scriptFullPath The script full path
-        * @param args Command line arguments that will be delivered to the 
executed python script
-        * @throws IOException
-        */
-       public static void initAndExecPythonScript(File scriptFullPath, 
String[] args) throws IOException {
-               initPythonInterpreter(scriptFullPath.getParent(), args);
-               pythonInterpreter.execfile(scriptFullPath.getAbsolutePath());
-
-               LOG.debug("Cleaning up temporary folder: " + 
scriptFullPath.getParent());
-               FileSystem fs = FileSystem.get(scriptFullPath.toURI());
-               fs.delete(new Path(scriptFullPath.getParent()), true);
-       }
-
-       private static synchronized void initPythonInterpreter(String 
pythonPath, String[] args) {
-               if (!jythonInitialized) {
-                       LOG.debug("Init python interpreter, path=" + 
pythonPath);
-                       Properties postProperties = new Properties();
-                       postProperties.put("python.path", pythonPath);
-                       PythonInterpreter.initialize(System.getProperties(), 
postProperties, args);
-
-                       pythonInterpreter = new PythonInterpreter();
-                       pythonInterpreter.setErr(System.err);
-                       pythonInterpreter.setOut(System.out);
-
-                       jythonInitialized = true;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaConsumer09.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaConsumer09.java
deleted file mode 100644
index e6598a2..0000000
--- 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaConsumer09.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.python.connectors;
-
-import 
org.apache.flink.streaming.python.util.serialization.PythonDeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-import java.io.IOException;
-import java.util.Properties;
-
-/**
- * Defines a generic implementation of the {@link FlinkKafkaConsumer09} class, 
by using a type
- * parameter of an {@code Object} class. The thin Python streaming layer 
converts the elements
- * that are read from the Kafka feed, to a {@code PyObject} for further 
handling.
- */
-public class PythonFlinkKafkaConsumer09 extends FlinkKafkaConsumer09<Object> {
-
-       public PythonFlinkKafkaConsumer09(String topic, 
DeserializationSchema<Object> schema, Properties props) throws IOException {
-               super(topic, new PythonDeserializationSchema(schema), props);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaProducer09.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaProducer09.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaProducer09.java
deleted file mode 100644
index b02b893..0000000
--- 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaProducer09.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.python.connectors;
-
-import 
org.apache.flink.streaming.python.util.serialization.PythonSerializationSchema;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.python.core.PyObject;
-
-import java.io.IOException;
-
-/**
- * Defines a generic Python implementation of {@link FlinkKafkaProducer09}, by 
using a type
- * parameter of {@code PyObject} class. It can be used as a sink in the python 
code.
- */
-public class PythonFlinkKafkaProducer09 extends FlinkKafkaProducer09<PyObject> 
{
-
-       public PythonFlinkKafkaProducer09(String brokerList, String topicId, 
SerializationSchema<PyObject> serializationSchema) throws IOException {
-               super(brokerList, topicId, new 
PythonSerializationSchema(serializationSchema));
-       }
-
-       public void set_log_failures_only(boolean logFailuresOnly) {
-               this.setLogFailuresOnly(logFailuresOnly);
-       }
-
-       public void set_flush_on_checkpoint(boolean flush) {
-               this.setFlushOnCheckpoint(flush);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/AdapterMap.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/AdapterMap.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/AdapterMap.java
new file mode 100644
index 0000000..3716375
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/AdapterMap.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.python.util;
+
+import org.apache.flink.api.common.functions.MapFunction;
+
+import org.python.core.Py;
+import org.python.core.PyObject;
+
+/**
+ * A generic map operator that convert any java type to PyObject. It is mainly 
used to convert elements
+ * collected from a source functions, to PyObject objects.
+ *
+ * @param <IN> Any given java object
+ */
+public class AdapterMap<IN> implements MapFunction<IN, PyObject> {
+       private static final long serialVersionUID = 1582769662549499373L;
+
+       /**
+        * Convert java object to its corresponding PyObject representation.
+        *
+        * @param o Java object
+        * @return PyObject
+        */
+       public static PyObject adapt(Object o) {
+               if (o instanceof PyObject) {
+                       return (PyObject) o;
+               }
+               return Py.java2py(o);
+       }
+
+       @Override
+       public PyObject map(IN value) throws Exception {
+               PyObject ret = adapt(value);
+               return ret;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/InterpreterUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/InterpreterUtils.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/InterpreterUtils.java
new file mode 100644
index 0000000..8c3520d
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/InterpreterUtils.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.python.util;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.python.api.environment.PythonConstants;
+import 
org.apache.flink.streaming.python.api.environment.PythonEnvironmentFactory;
+import 
org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment;
+import org.apache.flink.streaming.python.util.serialization.SerializationUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.python.core.PySystemState;
+import org.python.util.PythonInterpreter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Properties;
+
+/**
+ * A collection of utility methods for interacting with jython.
+ *
+ * <p><strong>Important:</strong> This class loads the jython runtime which is 
essentially one big singleton. We make
+ * the core assumption here that this class is loaded by separate ClassLoaders 
for each job or task allowing multiple
+ * instances of jython to exist in the same JVM without affecting each other.
+ */
+public class InterpreterUtils {
+       private static final Logger LOG = 
LoggerFactory.getLogger(InterpreterUtils.class);
+
+       private static PythonInterpreter pythonInterpreter = null;
+       private static boolean jythonInitialized = false;
+
+       private InterpreterUtils() {
+       }
+
+       /**
+        * Deserialize the given python function. If the functions class 
definition cannot be found we assume that this is
+        * the first invocation of this method for a given job and load the 
python script containing the class definition
+        * via jython.
+        *
+        * @param context the RuntimeContext of the java function
+        * @param serFun serialized python UDF
+        * @return deserialized python UDF
+        * @throws FlinkException if the deserialization failed
+        */
+       @SuppressWarnings("unchecked")
+       public static <X> X deserializeFunction(RuntimeContext context, byte[] 
serFun) throws FlinkException {
+               if (!jythonInitialized) {
+                       // This branch is only tested by end-to-end tests
+                       String path = 
context.getDistributedCache().getFile(PythonConstants.FLINK_PYTHON_DC_ID).getAbsolutePath();
+
+                       String scriptName = 
PythonStreamExecutionEnvironment.PythonJobParameters.getScriptName(context.getExecutionConfig().getGlobalJobParameters());
+
+                       try {
+                               initPythonInterpreter(
+                                       new String[]{Paths.get(path, 
scriptName).toString()},
+                                       path,
+                                       scriptName);
+                       } catch (Exception e) {
+                               LOG.error("Initialization of jython failed.");
+                               try {
+                                       LOG.error("Initialization of jython 
failed.", e);
+                                       throw new 
FlinkRuntimeException("Initialization of jython failed.", e);
+                               } catch (Exception ie) {
+                                       // this may occur if the initial 
exception relies on jython being initialized properly
+                                       LOG.error("Initialization of jython 
failed. Could not print original stacktrace.", ie);
+                                       throw new 
FlinkRuntimeException("Initialization of jython failed. Could not print 
original stacktrace.");
+                               }
+                       }
+               }
+
+               try {
+                       return (X) SerializationUtils.deserializeObject(serFun);
+               } catch (IOException | ClassNotFoundException ex) {
+                       throw new FlinkException("Deserialization of 
user-function failed.", ex);
+               }
+       }
+
+       /**
+        * Initializes the Jython interpreter and executes a python script.
+        *
+        * @param factory environment factory
+        * @param scriptDirectory the directory containing all required user 
python scripts
+        * @param scriptName the name of the main python script
+        * @param args Command line arguments that will be delivered to the 
executed python script
+        */
+       public static void initAndExecPythonScript(PythonEnvironmentFactory 
factory, java.nio.file.Path scriptDirectory, String scriptName, String[] args) {
+               String[] fullArgs = new String[args.length + 1];
+               fullArgs[0] = scriptDirectory.resolve(scriptName).toString();
+               System.arraycopy(args, 0, fullArgs, 1, args.length);
+
+               initPythonInterpreter(fullArgs, 
scriptDirectory.toUri().getPath(), scriptName);
+
+               PythonInterpreter pythonInterpreter = 
initPythonInterpreter(fullArgs, scriptDirectory.toUri().getPath(), scriptName);
+
+               pythonInterpreter.set("__flink_env_factory__", factory);
+               pythonInterpreter.exec(scriptName + 
".main(__flink_env_factory__)");
+       }
+
+       private static synchronized PythonInterpreter 
initPythonInterpreter(String[] args, String pythonPath, String scriptName) {
+               if (!jythonInitialized) {
+                       // the java stack traces within the jython runtime 
aren't useful for users
+                       
System.getProperties().put("python.options.includeJavaStackInExceptions", 
"false");
+                       PySystemState.initialize(System.getProperties(), new 
Properties(), args);
+
+                       pythonInterpreter = new PythonInterpreter();
+
+                       pythonInterpreter.getSystemState().path.add(0, 
pythonPath);
+
+                       pythonInterpreter.setErr(System.err);
+                       pythonInterpreter.setOut(System.out);
+
+                       pythonInterpreter.exec("import " + scriptName);
+                       jythonInitialized = true;
+               }
+               return pythonInterpreter;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonCollector.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonCollector.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonCollector.java
index 4caf668..702795f 100644
--- 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonCollector.java
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonCollector.java
@@ -15,11 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.python.util;
 
 import org.apache.flink.annotation.Public;
-import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
 import org.apache.flink.util.Collector;
+
 import org.python.core.PyObject;
 
 /**
@@ -27,7 +28,7 @@ import org.python.core.PyObject;
  * if necessary, to a {@code PyObject}.
  */
 @Public
-public class PythonCollector implements Collector<PyObject> {
+public class PythonCollector implements Collector<Object> {
        private Collector<PyObject> collector;
 
        public void setCollector(Collector<PyObject> collector) {
@@ -35,8 +36,8 @@ public class PythonCollector implements Collector<PyObject> {
        }
 
        @Override
-       public void collect(PyObject record) {
-               PyObject po = UtilityFunctions.adapt(record);
+       public void collect(Object record) {
+               PyObject po = AdapterMap.adapt(record);
                this.collector.collect(po);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonIterator.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonIterator.java
index 55fbd69..f99f6d6 100644
--- 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonIterator.java
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonIterator.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.python.util;
 
 import java.io.Serializable;

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyBooleanSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyBooleanSerializer.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyBooleanSerializer.java
new file mode 100644
index 0000000..fcc3e91
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyBooleanSerializer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.python.util.serialization;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.python.core.PyBoolean;
+
+/**
+ * A {@link Serializer} implementation for {@link PyBoolean} class type.
+ */
+public class PyBooleanSerializer extends Serializer<PyBoolean> {
+       @Override
+       public void write(Kryo kryo, Output output, PyBoolean object) {
+               output.writeBoolean(object.getBooleanValue());
+       }
+
+       @Override
+       public PyBoolean read(Kryo kryo, Input input, Class<PyBoolean> type) {
+               return new PyBoolean(input.readBoolean());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyFloatSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyFloatSerializer.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyFloatSerializer.java
new file mode 100644
index 0000000..5672ff6
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyFloatSerializer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.python.util.serialization;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.python.core.PyFloat;
+
+/**
+ * A {@link Serializer} implementation for {@link PyFloat} class type.
+ */
+public class PyFloatSerializer extends Serializer<PyFloat> {
+       @Override
+       public void write(Kryo kryo, Output output, PyFloat object) {
+               output.writeDouble(object.getValue());
+       }
+
+       @Override
+       public PyFloat read(Kryo kryo, Input input, Class<PyFloat> type) {
+               return new PyFloat(input.readDouble());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyIntegerSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyIntegerSerializer.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyIntegerSerializer.java
new file mode 100644
index 0000000..a19eb83
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyIntegerSerializer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.python.util.serialization;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.python.core.PyInteger;
+
+/**
+ * A {@link Serializer} implementation for {@link PyInteger} class type.
+ */
+public class PyIntegerSerializer extends Serializer<PyInteger> {
+       @Override
+       public void write(Kryo kryo, Output output, PyInteger object) {
+               output.writeInt(object.getValue());
+       }
+
+       @Override
+       public PyInteger read(Kryo kryo, Input input, Class<PyInteger> type) {
+               return new PyInteger(input.readInt());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyLongSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyLongSerializer.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyLongSerializer.java
new file mode 100644
index 0000000..7686a98
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyLongSerializer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.python.util.serialization;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.python.core.PyLong;
+
+import java.math.BigInteger;
+
+/**
+ * A {@link Serializer} implementation for {@link PyLong} class type.
+ */
+public class PyLongSerializer extends Serializer<PyLong> {
+       @Override
+       public void write(Kryo kryo, Output output, PyLong object) {
+               byte[] data = object.getValue().toByteArray();
+               output.writeShort(data.length);
+               output.writeBytes(data);
+       }
+
+       @Override
+       public PyLong read(Kryo kryo, Input input, Class<PyLong> type) {
+               int length = input.readShort();
+               return new PyLong(new BigInteger(input.readBytes(length)));
+       }
+}

Reply via email to