http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java index 6710bf1..0684e35 100644 --- a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java @@ -15,14 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.python.api.environment; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.filecache.FileCache; @@ -30,30 +31,37 @@ import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.python.PythonOptions; import org.apache.flink.streaming.python.api.datastream.PythonDataStream; import org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction; import org.apache.flink.streaming.python.api.functions.PythonIteratorFunction; -import org.apache.flink.streaming.python.api.functions.UtilityFunctions; +import org.apache.flink.streaming.python.util.AdapterMap; +import org.apache.flink.streaming.python.util.serialization.PyBooleanSerializer; +import org.apache.flink.streaming.python.util.serialization.PyFloatSerializer; +import org.apache.flink.streaming.python.util.serialization.PyIntegerSerializer; +import org.apache.flink.streaming.python.util.serialization.PyLongSerializer; import org.apache.flink.streaming.python.util.serialization.PyObjectSerializer; -import org.python.core.PyObject; -import org.python.core.PyString; +import org.apache.flink.streaming.python.util.serialization.PyStringSerializer; + +import org.python.core.PyBoolean; +import org.python.core.PyFloat; +import org.python.core.PyInstance; import org.python.core.PyInteger; import org.python.core.PyLong; -import org.python.core.PyUnicode; -import org.python.core.PyTuple; +import org.python.core.PyObject; import org.python.core.PyObjectDerived; -import org.python.core.PyInstance; +import org.python.core.PyString; +import org.python.core.PyTuple; +import org.python.core.PyUnicode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.file.Paths; import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; -import java.util.Random; - +import java.util.Map; +import java.util.UUID; /** * A thin wrapper layer over {@link StreamExecutionEnvironment}. @@ -69,155 +77,70 @@ import java.util.Random; public class PythonStreamExecutionEnvironment { private static final Logger LOG = LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class); private final StreamExecutionEnvironment env; + private final Path pythonTmpCachePath; + private final Path tmpDistributedDir; - /** - * A thin wrapper layer over {@link StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes - * care for required Jython serializers registration. - * - * @return The python execution environment of the context in which the program is - * executed. - */ - public static PythonStreamExecutionEnvironment get_execution_environment() { - return new PythonStreamExecutionEnvironment(StreamExecutionEnvironment.getExecutionEnvironment()); + PythonStreamExecutionEnvironment(StreamExecutionEnvironment env, Path tmpLocalDir, Path tmpDistributedDir, String scriptName) { + this.env = env; + this.pythonTmpCachePath = tmpLocalDir; + this.tmpDistributedDir = tmpDistributedDir; + env.getConfig().setGlobalJobParameters(new PythonJobParameters(scriptName)); + registerJythonSerializers(this.env); } /** - * Creates a {@link LocalStreamEnvironment}. The local execution environment - * will run the program in a multi-threaded fashion in the same JVM as the - * environment was created in. The default parallelism of the local - * environment is the number of hardware contexts (CPU cores / threads), - * unless it was specified differently by {@link #setParallelism(int)}. - * - * @param configuration - * Pass a custom configuration into the cluster - * @return A local execution environment with the specified parallelism. + * Utility class for storing/retrieving parameters in/from a {@link ExecutionConfig.GlobalJobParameters}. */ - public static PythonStreamExecutionEnvironment create_local_execution_environment(Configuration config) { - return new PythonStreamExecutionEnvironment(new LocalStreamEnvironment(config)); - } + public static class PythonJobParameters extends ExecutionConfig.GlobalJobParameters { + private static final String KEY_SCRIPT_NAME = "scriptName"; + private Map<String, String> parameters = new HashMap<>(); - /** - * A thin wrapper layer over {@link StreamExecutionEnvironment#createLocalEnvironment(int, Configuration)} - * - * @param parallelism - * The parallelism for the local environment. - * @param config - * Pass a custom configuration into the cluster - * @return A local python execution environment with the specified parallelism. - */ - public static PythonStreamExecutionEnvironment create_local_execution_environment(int parallelism, Configuration config) { - return new PythonStreamExecutionEnvironment( - StreamExecutionEnvironment.createLocalEnvironment(parallelism, config)); - } + PythonJobParameters(String scriptName) { + parameters.put(KEY_SCRIPT_NAME, scriptName); + } - /** - * A thin wrapper layer over {@link StreamExecutionEnvironment#createRemoteEnvironment(java.lang.String, int, java.lang.String...)} - * - * @param host - * The host name or address of the master (JobManager), where the - * program should be executed. - * @param port - * The port of the master (JobManager), where the program should - * be executed. - * @param jar_files - * The JAR files with code that needs to be shipped to the - * cluster. If the program uses user-defined functions, - * user-defined input formats, or any libraries, those must be - * provided in the JAR files. - * @return A remote environment that executes the program on a cluster. - */ - public static PythonStreamExecutionEnvironment create_remote_execution_environment( - String host, int port, String... jar_files) { - return new PythonStreamExecutionEnvironment( - StreamExecutionEnvironment.createRemoteEnvironment(host, port, jar_files)); - } + public Map<String, String> toMap() { + return parameters; + } - /** - * A thin wrapper layer over {@link StreamExecutionEnvironment#createRemoteEnvironment( - * java.lang.String, int, Configuration, java.lang.String...)} - * - * @param host - * The host name or address of the master (JobManager), where the - * program should be executed. - * @param port - * The port of the master (JobManager), where the program should - * be executed. - * @param config - * The configuration used by the client that connects to the remote cluster. - * @param jar_files - * The JAR files with code that needs to be shipped to the - * cluster. If the program uses user-defined functions, - * user-defined input formats, or any libraries, those must be - * provided in the JAR files. - * @return A remote environment that executes the program on a cluster. - * - */ - public static PythonStreamExecutionEnvironment create_remote_execution_environment( - String host, int port, Configuration config, String... jar_files) { - return new PythonStreamExecutionEnvironment( - StreamExecutionEnvironment.createRemoteEnvironment(host, port, config, jar_files)); + public static String getScriptName(ExecutionConfig.GlobalJobParameters parameters) { + return parameters.toMap().get(KEY_SCRIPT_NAME); + } } - /** - * A thin wrapper layer over {@link StreamExecutionEnvironment#createRemoteEnvironment( - * java.lang.String, int, int, java.lang.String...)} - * - * @param host - * The host name or address of the master (JobManager), where the - * program should be executed. - * @param port - * The port of the master (JobManager), where the program should - * be executed. - * @param parallelism - * The parallelism to use during the execution. - * @param jar_files - * The JAR files with code that needs to be shipped to the - * cluster. If the program uses user-defined functions, - * user-defined input formats, or any libraries, those must be - * provided in the JAR files. - * @return A remote environment that executes the program on a cluster. - */ - public static PythonStreamExecutionEnvironment create_remote_execution_environment( - String host, int port, int parallelism, String... jar_files) { - return new PythonStreamExecutionEnvironment( - StreamExecutionEnvironment.createRemoteEnvironment(host, port, parallelism, jar_files)); - } + private static void registerJythonSerializers(StreamExecutionEnvironment env) { + env.registerTypeWithKryoSerializer(PyBoolean.class, PyBooleanSerializer.class); + env.registerTypeWithKryoSerializer(PyFloat.class, PyFloatSerializer.class); + env.registerTypeWithKryoSerializer(PyInteger.class, PyIntegerSerializer.class); + env.registerTypeWithKryoSerializer(PyLong.class, PyLongSerializer.class); - private PythonStreamExecutionEnvironment(StreamExecutionEnvironment env) { - this.env = env; - this.registerJythonSerializers(); - } + env.registerTypeWithKryoSerializer(PyString.class, PyStringSerializer.class); + env.registerTypeWithKryoSerializer(PyUnicode.class, PyObjectSerializer.class); - private void registerJythonSerializers() { - this.env.registerTypeWithKryoSerializer(PyString.class, PyObjectSerializer.class); - this.env.registerTypeWithKryoSerializer(PyInteger.class, PyObjectSerializer.class); - this.env.registerTypeWithKryoSerializer(PyLong.class, PyObjectSerializer.class); - this.env.registerTypeWithKryoSerializer(PyUnicode.class, PyObjectSerializer.class); - this.env.registerTypeWithKryoSerializer(PyTuple.class, PyObjectSerializer.class); - this.env.registerTypeWithKryoSerializer(PyObjectDerived.class, PyObjectSerializer.class); - this.env.registerTypeWithKryoSerializer(PyInstance.class, PyObjectSerializer.class); + env.registerTypeWithKryoSerializer(PyTuple.class, PyObjectSerializer.class); + env.registerTypeWithKryoSerializer(PyObjectDerived.class, PyObjectSerializer.class); + env.registerTypeWithKryoSerializer(PyInstance.class, PyObjectSerializer.class); } public PythonDataStream create_python_source(SourceFunction<Object> src) throws Exception { - return new PythonDataStream<>(env.addSource(new PythonGeneratorFunction(src)).map(new UtilityFunctions.SerializerMap<>())); + return new PythonDataStream<>(env.addSource(new PythonGeneratorFunction(src)).map(new AdapterMap<>())); } /** * Add a java source to the streaming topology. The source expected to be an java based * implementation (.e.g. Kafka connector). * - * @param src A native java source (e.g. PythonFlinkKafkaConsumer09) + * @param src A native java source (e.g. PythonFlinkKafkaConsumer09) * @return Python data stream */ public PythonDataStream add_java_source(SourceFunction<Object> src) { - return new PythonDataStream<>(env.addSource(src).map(new UtilityFunctions.SerializerMap<>())); + return new PythonDataStream<>(env.addSource(src).map(new AdapterMap<>())); } /** - * A thin wrapper layer over {@link StreamExecutionEnvironment#fromElements(java.lang.Object[])} + * A thin wrapper layer over {@link StreamExecutionEnvironment#fromElements(java.lang.Object[])}. * - * @param elements - * The array of PyObject elements to create the data stream from. + * @param elements The array of PyObject elements to create the data stream from. * @return The data stream representing the given array of elements */ public PythonDataStream from_elements(PyObject... elements) { @@ -230,13 +153,11 @@ public class PythonStreamExecutionEnvironment { * <p>The input {@code Collection} is of type {@code Object}, because it is a collection * of Python elements. * There type is determined in runtime, by the Jython framework.</p> * - * @param collection - * The collection of python elements to create the data stream from. - * @return - * The data stream representing the given collection + * @param collection The collection of python elements to create the data stream from. + * @return The data stream representing the given collection */ public PythonDataStream from_collection(Collection<Object> collection) { - return new PythonDataStream<>(env.fromCollection(collection).map(new UtilityFunctions.SerializerMap<>())); + return new PythonDataStream<>(env.fromCollection(collection).map(new AdapterMap<>())); } /** @@ -245,54 +166,48 @@ public class PythonStreamExecutionEnvironment { * <p>Note that this operation will result in a non-parallel data stream source, i.e., * a data stream source with a parallelism of one.</p> * - * @param iter - * The iterator of elements to create the data stream from + * @param iter The iterator of elements to create the data stream from * @return The data stream representing the elements in the iterator * @see StreamExecutionEnvironment#fromCollection(java.util.Iterator, org.apache.flink.api.common.typeinfo.TypeInformation) */ - public PythonDataStream from_collection(Iterator<Object> iter) throws Exception { + public PythonDataStream from_collection(Iterator<Object> iter) throws Exception { return new PythonDataStream<>(env.addSource(new PythonIteratorFunction(iter), TypeExtractor.getForClass(Object.class)) - .map(new UtilityFunctions.SerializerMap<>())); + .map(new AdapterMap<>())); } /** * A thin wrapper layer over {@link StreamExecutionEnvironment#generateSequence(long, long)}. * - * @param from - * The number to start at (inclusive) - * @param to - * The number to stop at (inclusive) + * @param from The number to start at (inclusive) + * @param to The number to stop at (inclusive) * @return A python data stream, containing all number in the [from, to] interval */ public PythonDataStream generate_sequence(long from, long to) { - return new PythonDataStream<>(env.generateSequence(from, to).map(new UtilityFunctions.SerializerMap<Long>())); + return new PythonDataStream<>(env.generateSequence(from, to).map(new AdapterMap<>())); } /** * A thin wrapper layer over {@link StreamExecutionEnvironment#readTextFile(java.lang.String)}. * - * @param path - * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path"). + * @param path The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path"). * @return The data stream that represents the data read from the given file as text lines * @throws IOException */ public PythonDataStream read_text_file(String path) throws IOException { - return new PythonDataStream<>(env.readTextFile(path).map(new UtilityFunctions.SerializerMap<String>())); + return new PythonDataStream<>(env.readTextFile(path).map(new AdapterMap<String>())); } /** * A thin wrapper layer over {@link StreamExecutionEnvironment#socketTextStream(java.lang.String, int)}. * - * @param host - * The host name which a server socket binds - * @param port - * The port number which a server socket binds. A port number of 0 means that the port number is automatically - * allocated. + * @param host The host name which a server socket binds + * @param port The port number which a server socket binds. A port number of 0 means that the port number is automatically + * allocated. * @return A python data stream containing the strings received from the socket */ public PythonDataStream socket_text_stream(String host, int port) { - return new PythonDataStream<>(env.socketTextStream(host, port).map(new UtilityFunctions.SerializerMap<String>())); + return new PythonDataStream<>(env.socketTextStream(host, port).map(new AdapterMap<String>())); } /** @@ -310,8 +225,7 @@ public class PythonStreamExecutionEnvironment { * A thin wrapper layer over {@link StreamExecutionEnvironment#enableCheckpointing(long, CheckpointingMode)}. * * @param interval Time interval between state checkpoints in milliseconds. - * @param mode - * The checkpointing mode, selecting between "exactly once" and "at least once" guaranteed. + * @param mode The checkpointing mode, selecting between "exactly once" and "at least once" guaranteed. * @return The same {@code PythonStreamExecutionEnvironment} instance of the caller */ public PythonStreamExecutionEnvironment enable_checkpointing(long interval, CheckpointingMode mode) { @@ -336,24 +250,7 @@ public class PythonStreamExecutionEnvironment { * @return The result of the job execution */ public JobExecutionResult execute() throws Exception { - return execute(false); - } - - /** - * A thin wrapper layer over {@link StreamExecutionEnvironment#execute()}. - * - * <p>In addition, it enables the caller to provide a hint about the execution mode - whether it is local - * or remote. In the case of local execution, the relevant cached files are distributed using the - * local machine temporary folder, otherwise a shared storage medium is used for this purpose.</p> - * - * @return The result of the job execution - */ - public JobExecutionResult execute(Boolean local) throws Exception { - if (PythonEnvironmentConfig.pythonTmpCachePath == null) { - // Nothing to be done! Is is executed on the task manager. - return new JobExecutionResult(null, 0, null); - } - distributeFiles(local); + distributeFiles(); JobExecutionResult result = this.env.execute(); cleanupDistributedFiles(); return result; @@ -366,51 +263,30 @@ public class PythonStreamExecutionEnvironment { * @throws Exception which occurs during job execution. */ public JobExecutionResult execute(String job_name) throws Exception { - return execute(job_name, false); - } - - /** - * A thin wrapper layer over {@link StreamExecutionEnvironment#execute(java.lang.String). - * - * <p>In addition, it enables the caller to provide a hint about the execution mode - whether it is local - * or remote. In the case of local execution, the relevant cached files are distributed using the - * local machine temporary folder, otherwise a shared storage medium is used for this purpose.</p> - * - * @return The result of the job execution, containing elapsed time and accumulators. - * @throws Exception which occurs during job execution. - */ - public JobExecutionResult execute(String job_name, Boolean local) throws Exception { - if (PythonEnvironmentConfig.pythonTmpCachePath == null) { - // Nothing to be done! Is is executed on the task manager. - return new JobExecutionResult(null, 0, null); - } - distributeFiles(local); + distributeFiles(); JobExecutionResult result = this.env.execute(job_name); cleanupDistributedFiles(); return result; } - private void distributeFiles(boolean local) throws IOException, URISyntaxException - { - String rootDir; - if (local || this.env instanceof LocalStreamEnvironment) { - rootDir = System.getProperty("java.io.tmpdir"); + private void distributeFiles() throws IOException { + Path remoteRootDir; + if (this.env instanceof LocalStreamEnvironment) { + remoteRootDir = new Path(PythonOptions.DC_TMP_DIR.defaultValue()); } else { - rootDir = "hdfs:///tmp"; + remoteRootDir = tmpDistributedDir; } - PythonEnvironmentConfig.FLINK_HDFS_PATH = Paths.get(rootDir, "flink_cache_" + - (new Random(System.currentTimeMillis())).nextLong()).toString(); + Path remoteDir = new Path(remoteRootDir, "flink_cache_" + UUID.randomUUID()); - FileCache.copy(new Path(PythonEnvironmentConfig.pythonTmpCachePath), - new Path(PythonEnvironmentConfig.FLINK_HDFS_PATH), true); + FileCache.copy(pythonTmpCachePath, remoteDir, true); - this.env.registerCachedFile(PythonEnvironmentConfig.FLINK_HDFS_PATH, PythonEnvironmentConfig.FLINK_PYTHON_DC_ID); + this.env.registerCachedFile(remoteDir.toUri().toString(), PythonConstants.FLINK_PYTHON_DC_ID); } - private void cleanupDistributedFiles() throws IOException, URISyntaxException { + private void cleanupDistributedFiles() throws IOException { for (Tuple2<String, DistributedCache.DistributedCacheEntry> e : this.env.getCachedFiles()) { - URI fileUri = new URI(e.f1.filePath); - FileSystem fs = FileSystem.get(fileUri); + Path fileUri = new Path(e.f1.filePath); + FileSystem fs = fileUri.getFileSystem(); LOG.debug(String.format("Cleaning up cached path: %s, uriPath: %s, fileSystem: %s", e.f1.filePath, fileUri.getPath(),
http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/AbstractPythonUDF.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/AbstractPythonUDF.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/AbstractPythonUDF.java new file mode 100644 index 0000000..ca8ea04 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/AbstractPythonUDF.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.python.api.functions; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.python.util.InterpreterUtils; +import org.apache.flink.streaming.python.util.serialization.SerializationUtils; +import org.apache.flink.util.FlinkException; + +import org.python.core.PyException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; + +/** + * Generic base-class for wrappers of python functions implenting the {@link Function} interface. + */ +public class AbstractPythonUDF<F extends Function> extends AbstractRichFunction { + protected Logger log = LoggerFactory.getLogger(AbstractPythonUDF.class); + private final byte[] serFun; + protected transient F fun; + + AbstractPythonUDF(F fun) throws IOException { + this.serFun = SerializationUtils.serializeObject(fun); + } + + @Override + public void open(Configuration parameters) throws Exception { + this.fun = InterpreterUtils.deserializeFunction(getRuntimeContext(), this.serFun); + if (this.fun instanceof RichFunction) { + try { + final RichFunction rf = (RichFunction) this.fun; + rf.setRuntimeContext(getRuntimeContext()); + rf.open(parameters); + } catch (PyException pe) { + throw createAndLogException(pe); + } + } + } + + @Override + public void close() throws Exception { + if (this.fun instanceof RichFunction) { + try { + ((RichFunction) this.fun).close(); + } catch (PyException pe) { + throw createAndLogException(pe); + } + } + } + + FlinkException createAndLogException(PyException pe) { + return createAndLogException(pe, log); + } + + static FlinkException createAndLogException(PyException pe, Logger log) { + StringWriter sw = new StringWriter(); + try (PrintWriter pw = new PrintWriter(sw)) { + pe.printStackTrace(pw); + } + String pythonStackTrace = sw.toString().trim(); + + log.error("Python function failed: " + System.lineSeparator() + pythonStackTrace); + return new FlinkException("Python function failed: " + pythonStackTrace); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PyKey.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PyKey.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PyKey.java index 52ac4ff..964ef2c 100644 --- a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PyKey.java +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PyKey.java @@ -15,8 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.python.api.functions; +package org.apache.flink.streaming.python.api.functions; /** * A key type used by {@link org.apache.flink.streaming.python.api.functions.PythonKeySelector} to @@ -30,7 +30,6 @@ public class PyKey { this.data = data; } - public Object getData() { return data; } http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonApplyFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonApplyFunction.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonApplyFunction.java index 8fe3245..bf883b0 100644 --- a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonApplyFunction.java +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonApplyFunction.java @@ -15,15 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.python.api.functions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.python.util.PythonCollector; -import org.apache.flink.streaming.python.util.serialization.SerializationUtils; import org.apache.flink.util.Collector; + +import org.python.core.PyException; import org.python.core.PyObject; import java.io.IOException; @@ -37,41 +38,28 @@ import java.io.IOException; * <p>This function is used internally by the Python thin wrapper layer over the streaming data * functionality</p> */ -public class PythonApplyFunction<W extends Window> extends RichWindowFunction<PyObject, PyObject, PyKey, W> { +public class PythonApplyFunction<W extends Window> extends AbstractPythonUDF<WindowFunction<PyObject, Object, Object, W>> implements WindowFunction<PyObject, PyObject, PyKey, W> { private static final long serialVersionUID = 577032239468987781L; - private final byte[] serFun; - private transient WindowFunction<PyObject, PyObject, Object, Window> fun; private transient PythonCollector collector; - public PythonApplyFunction(WindowFunction<PyObject, PyObject, Object, W> fun) throws IOException { - this.serFun = SerializationUtils.serializeObject(fun); + public PythonApplyFunction(WindowFunction<PyObject, Object, Object, W> fun) throws IOException { + super(fun); } @Override - @SuppressWarnings("unchecked") - public void open(Configuration parameters) throws Exception { - this.fun = - (WindowFunction<PyObject, PyObject, Object, Window>) UtilityFunctions.smartFunctionDeserialization( - getRuntimeContext(), this.serFun); - if (this.fun instanceof RichWindowFunction) { - final RichWindowFunction winFun = (RichWindowFunction)this.fun; - winFun.setRuntimeContext(getRuntimeContext()); - winFun.open(parameters); - } + public void open(Configuration config) throws Exception { + super.open(config); this.collector = new PythonCollector(); } @Override - public void close() throws Exception { - if (this.fun instanceof RichWindowFunction) { - ((RichWindowFunction)this.fun).close(); - } - } - - @Override public void apply(PyKey key, W window, Iterable<PyObject> values, Collector<PyObject> out) throws Exception { this.collector.setCollector(out); - this.fun.apply(key.getData(), window, values, this.collector); + try { + this.fun.apply(key.getData(), window, values, this.collector); + } catch (PyException pe) { + throw createAndLogException(pe); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFilterFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFilterFunction.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFilterFunction.java index 345fdcc..fab7ceb 100644 --- a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFilterFunction.java +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFilterFunction.java @@ -15,18 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.python.api.functions; import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.common.functions.RichFilterFunction; -import org.apache.flink.api.common.functions.RichFunction; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.python.util.serialization.SerializationUtils; + +import org.python.core.PyException; import org.python.core.PyObject; import java.io.IOException; - /** * The {@code PythonFilterFunction} is a thin wrapper layer over a Python UDF {@code FilterFunction}. * It receives a {@code FilterFunction} as an input and keeps it internally in a serialized form. @@ -36,37 +34,19 @@ import java.io.IOException; * <p>This function is used internally by the Python thin wrapper layer over the streaming data * functionality</p> */ -public class PythonFilterFunction extends RichFilterFunction<PyObject> { +public class PythonFilterFunction extends AbstractPythonUDF<FilterFunction<PyObject>> implements FilterFunction<PyObject> { private static final long serialVersionUID = 775688642701399472L; - private final byte[] serFun; - private transient FilterFunction<PyObject> fun; - public PythonFilterFunction(FilterFunction<PyObject> fun) throws IOException { - this.serFun = SerializationUtils.serializeObject(fun); - } - - @Override - @SuppressWarnings("unchecked") - public void open(Configuration parameters) throws Exception { - this.fun = (FilterFunction<PyObject>) UtilityFunctions.smartFunctionDeserialization( - getRuntimeContext(), this.serFun); - if (this.fun instanceof RichFunction) { - final RichFilterFunction filterFun = (RichFilterFunction)this.fun; - filterFun.setRuntimeContext(getRuntimeContext()); - filterFun.open(parameters); - } - } - - @Override - public void close() throws Exception { - if (this.fun instanceof RichFunction) { - ((RichFilterFunction)this.fun).close(); - } + super(fun); } @Override public boolean filter(PyObject value) throws Exception { - return this.fun.filter(value); + try { + return this.fun.filter(value); + } catch (PyException pe) { + throw createAndLogException(pe); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFlatMapFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFlatMapFunction.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFlatMapFunction.java index 21554c7..a768bff 100644 --- a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFlatMapFunction.java +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFlatMapFunction.java @@ -15,20 +15,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.python.api.functions; import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.RichFlatMapFunction; -import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.python.util.PythonCollector; -import org.apache.flink.streaming.python.util.serialization.SerializationUtils; import org.apache.flink.util.Collector; + +import org.python.core.PyException; import org.python.core.PyObject; import java.io.IOException; - /** * The {@code PythonFlatMapFunction} is a thin wrapper layer over a Python UDF {@code FlatMapFunction}. * It receives a {@code FlatMapFunction} as an input and keeps it internally in a serialized form. @@ -38,41 +37,28 @@ import java.io.IOException; * <p>This function is used internally by the Python thin wrapper layer over the streaming data * functionality</p> */ -public class PythonFlatMapFunction extends RichFlatMapFunction<PyObject, PyObject> { +public class PythonFlatMapFunction extends AbstractPythonUDF<FlatMapFunction<PyObject, Object>> implements FlatMapFunction<PyObject, PyObject> { private static final long serialVersionUID = -6098432222172956477L; - private final byte[] serFun; - private transient FlatMapFunction<PyObject, PyObject> fun; private transient PythonCollector collector; - public PythonFlatMapFunction(FlatMapFunction<PyObject, PyObject> fun) throws IOException { - this.serFun = SerializationUtils.serializeObject(fun); + public PythonFlatMapFunction(FlatMapFunction<PyObject, Object> fun) throws IOException { + super(fun); } @Override - @SuppressWarnings("unchecked") public void open(Configuration config) throws Exception { - this.fun = - (FlatMapFunction<PyObject, PyObject>) UtilityFunctions.smartFunctionDeserialization( - getRuntimeContext(), serFun); - if (this.fun instanceof RichFunction) { - final RichFlatMapFunction flatMapFun = (RichFlatMapFunction)this.fun; - flatMapFun.setRuntimeContext(getRuntimeContext()); - flatMapFun.open(config); - } + super.open(config); this.collector = new PythonCollector(); } @Override - public void close() throws Exception { - if (this.fun instanceof RichFunction) { - ((RichFlatMapFunction)this.fun).close(); - } - } - - @Override public void flatMap(PyObject value, Collector<PyObject> out) throws Exception { this.collector.setCollector(out); - this.fun.flatMap(value, this.collector); + try { + this.fun.flatMap(value, this.collector); + } catch (PyException pe) { + throw createAndLogException(pe); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonGeneratorFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonGeneratorFunction.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonGeneratorFunction.java index 70f212e..196edb5 100644 --- a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonGeneratorFunction.java +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonGeneratorFunction.java @@ -15,12 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.python.api.functions; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.python.util.serialization.SerializationUtils; + +import org.python.core.PyException; import java.io.IOException; @@ -33,33 +33,28 @@ import java.io.IOException; * <p>This function is used internally by the Python thin wrapper layer over the streaming data * functionality</p> */ -public class PythonGeneratorFunction extends RichSourceFunction<Object> { +public class PythonGeneratorFunction extends AbstractPythonUDF<SourceFunction<Object>> implements SourceFunction<Object> { private static final long serialVersionUID = 3854587935845323082L; - private final byte[] serFun; - private transient SourceFunction<Object> fun; - public PythonGeneratorFunction(SourceFunction<Object> fun) throws IOException { - this.serFun = SerializationUtils.serializeObject(fun); + super(fun); } - @Override - @SuppressWarnings("unchecked") - public void open(Configuration parameters) throws Exception { - this.fun = (SourceFunction<Object>) UtilityFunctions.smartFunctionDeserialization( - getRuntimeContext(), this.serFun); - } - - @Override - public void close() throws Exception {} - public void run(SourceContext<Object> ctx) throws Exception { - this.fun.run(ctx); + try { + this.fun.run(ctx); + } catch (PyException pe) { + throw createAndLogException(pe); + } } public void cancel() { if (this.fun != null) { - this.fun.cancel(); + try { + this.fun.cancel(); + } catch (PyException pe) { + createAndLogException(pe); + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonIteratorFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonIteratorFunction.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonIteratorFunction.java index 38bf175..709fe19 100644 --- a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonIteratorFunction.java +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonIteratorFunction.java @@ -15,14 +15,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.python.api.functions; +import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.python.util.InterpreterUtils; import org.apache.flink.streaming.python.util.serialization.SerializationUtils; -import java.util.Iterator; +import org.python.core.PyException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; +import java.util.Iterator; /** * The {@code PythonIteratorFunction} is a thin wrapper layer over a Python UDF {@code Iterator}. @@ -35,6 +42,7 @@ import java.io.IOException; */ public class PythonIteratorFunction extends RichSourceFunction<Object> { private static final long serialVersionUID = 6741748297048588334L; + private static final Logger LOG = LoggerFactory.getLogger(PythonIteratorFunction.class); private final byte[] serFun; private transient Iterator<Object> fun; @@ -45,20 +53,38 @@ public class PythonIteratorFunction extends RichSourceFunction<Object> { } @Override - @SuppressWarnings("unchecked") public void open(Configuration parameters) throws Exception { - this.isRunning = true; - this.fun = (Iterator<Object>) UtilityFunctions.smartFunctionDeserialization(getRuntimeContext(), this.serFun); + this.fun = InterpreterUtils.deserializeFunction(getRuntimeContext(), this.serFun); + if (this.fun instanceof RichFunction) { + try { + final RichFunction winFun = (RichFunction) this.fun; + winFun.setRuntimeContext(getRuntimeContext()); + winFun.open(parameters); + } catch (PyException pe) { + throw AbstractPythonUDF.createAndLogException(pe, LOG); + } + } } @Override - public void close() throws Exception { + public void run(SourceContext<Object> ctx) throws Exception { + try { + while (isRunning && this.fun.hasNext()) { + ctx.collect(this.fun.next()); + } + } catch (PyException pe) { + throw AbstractPythonUDF.createAndLogException(pe, LOG); + } } @Override - public void run(SourceContext<Object> ctx) throws Exception { - while (isRunning && this.fun.hasNext()) { - ctx.collect(this.fun.next()); + public void close() throws Exception { + if (this.fun instanceof RichFunction) { + try { + ((RichFunction) this.fun).close(); + } catch (PyException pe) { + throw AbstractPythonUDF.createAndLogException(pe, LOG); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonKeySelector.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonKeySelector.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonKeySelector.java index 7cdaacb..03a49be 100644 --- a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonKeySelector.java +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonKeySelector.java @@ -15,11 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.python.api.functions; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.python.util.serialization.SerializationUtils; + +import org.python.core.PyException; import org.python.core.PyObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; @@ -34,6 +39,8 @@ import java.io.IOException; */ public class PythonKeySelector implements KeySelector<PyObject, PyKey> { private static final long serialVersionUID = 7403775239671366607L; + private static final Logger LOG = LoggerFactory.getLogger(PythonKeySelector.class); + private final byte[] serFun; private transient KeySelector<PyObject, Object> fun; @@ -45,9 +52,13 @@ public class PythonKeySelector implements KeySelector<PyObject, PyKey> { @SuppressWarnings("unchecked") public PyKey getKey(PyObject value) throws Exception { if (fun == null) { - fun = (KeySelector<PyObject, Object>) SerializationUtils.deserializeObject(serFun); + fun = SerializationUtils.deserializeObject(serFun); + } + try { + Object key = fun.getKey(value); + return new PyKey(key); + } catch (PyException pe) { + throw AbstractPythonUDF.createAndLogException(pe, LOG); } - Object key = fun.getKey(value); - return new PyKey(key); } } http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonMapFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonMapFunction.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonMapFunction.java index 24d19da..bef3fcb 100644 --- a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonMapFunction.java +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonMapFunction.java @@ -15,13 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.python.api.functions; import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.RichFunction; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.python.util.serialization.SerializationUtils; +import org.apache.flink.streaming.python.util.AdapterMap; + +import org.python.core.PyException; import org.python.core.PyObject; import java.io.IOException; @@ -35,36 +35,19 @@ import java.io.IOException; * <p>This function is used internally by the Python thin wrapper layer over the streaming data * functionality</p> */ -public class PythonMapFunction extends RichMapFunction<PyObject, PyObject> { +public class PythonMapFunction extends AbstractPythonUDF<MapFunction<PyObject, PyObject>> implements MapFunction<PyObject, PyObject> { private static final long serialVersionUID = 3001212087036451818L; - private final byte[] serFun; - private transient MapFunction<PyObject, PyObject> fun; public PythonMapFunction(MapFunction<PyObject, PyObject> fun) throws IOException { - this.serFun = SerializationUtils.serializeObject(fun); - } - - @Override - @SuppressWarnings("unchecked") - public void open(Configuration config) throws Exception { - this.fun = (MapFunction<PyObject, PyObject>) UtilityFunctions.smartFunctionDeserialization( - getRuntimeContext(), serFun); - if (this.fun instanceof RichFunction) { - final RichMapFunction mapFun = (RichMapFunction)this.fun; - mapFun.setRuntimeContext(getRuntimeContext()); - mapFun.open(config); - } - } - - @Override - public void close() throws Exception { - if (this.fun instanceof RichFunction) { - ((RichMapFunction)this.fun).close(); - } + super(fun); } @Override public PyObject map(PyObject value) throws Exception { - return UtilityFunctions.adapt(this.fun.map(value)); + try { + return AdapterMap.adapt(fun.map(value)); + } catch (PyException pe) { + throw createAndLogException(pe); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonOutputSelector.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonOutputSelector.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonOutputSelector.java index f022322..3e4759e 100644 --- a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonOutputSelector.java +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonOutputSelector.java @@ -15,11 +15,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.python.api.functions; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.python.util.serialization.SerializationUtils; +import org.apache.flink.util.FlinkRuntimeException; + +import org.python.core.PyException; import org.python.core.PyObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; @@ -34,6 +40,7 @@ import java.io.IOException; */ public class PythonOutputSelector implements OutputSelector<PyObject> { private static final long serialVersionUID = 909266346633598177L; + private static final Logger LOG = LoggerFactory.getLogger(PythonOutputSelector.class); private final byte[] serFun; private transient OutputSelector<PyObject> fun; @@ -43,20 +50,18 @@ public class PythonOutputSelector implements OutputSelector<PyObject> { } @Override - @SuppressWarnings("unchecked") public Iterable<String> select(PyObject value) { if (this.fun == null) { try { - this.fun = (OutputSelector<PyObject>) SerializationUtils.deserializeObject(this.serFun); - } catch (IOException e) { - e.printStackTrace(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); + fun = SerializationUtils.deserializeObject(serFun); + } catch (Exception e) { + throw new FlinkRuntimeException("Failed to deserialize user-defined function.", e); } } - if (this.fun == null) { - return null; + try { + return this.fun.select(value); + } catch (PyException pe) { + throw new FlinkRuntimeException(AbstractPythonUDF.createAndLogException(pe, LOG)); } - return this.fun.select(value); } } http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonReduceFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonReduceFunction.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonReduceFunction.java index d02240c..1fa071d 100644 --- a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonReduceFunction.java +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonReduceFunction.java @@ -15,12 +15,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.python.api.functions; -import java.io.IOException; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.streaming.python.util.AdapterMap; import org.apache.flink.streaming.python.util.serialization.SerializationUtils; + +import org.python.core.PyException; import org.python.core.PyObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; /** * The {@code PythonReduceFunction} is a thin wrapper layer over a Python UDF {@code ReduceFunction}. @@ -33,6 +40,7 @@ import org.python.core.PyObject; */ public class PythonReduceFunction implements ReduceFunction<PyObject> { private static final long serialVersionUID = -9070596504893036458L; + private static final Logger LOG = LoggerFactory.getLogger(PythonReduceFunction.class); private final byte[] serFun; private transient ReduceFunction<PyObject> fun; @@ -42,12 +50,14 @@ public class PythonReduceFunction implements ReduceFunction<PyObject> { } @Override - @SuppressWarnings("unchecked") public PyObject reduce(PyObject value1, PyObject value2) throws Exception { if (fun == null) { - fun = (ReduceFunction<PyObject>) SerializationUtils.deserializeObject(serFun); + fun = SerializationUtils.deserializeObject(serFun); + } + try { + return AdapterMap.adapt(this.fun.reduce(value1, value2)); + } catch (PyException pe) { + throw AbstractPythonUDF.createAndLogException(pe, LOG); } - - return UtilityFunctions.adapt(this.fun.reduce(value1, value2)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonSinkFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonSinkFunction.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonSinkFunction.java index e0351b0..f860e0b 100644 --- a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonSinkFunction.java +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonSinkFunction.java @@ -15,14 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.python.api.functions; -import org.apache.flink.api.common.functions.RichFunction; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.python.util.serialization.SerializationUtils; + +import org.python.core.PyException; import org.python.core.PyObject; import java.io.IOException; @@ -36,37 +34,20 @@ import java.io.IOException; * <p>This function is used internally by the Python thin wrapper layer over the streaming data * functionality</p> */ -public class PythonSinkFunction extends RichSinkFunction<PyObject> { +public class PythonSinkFunction extends AbstractPythonUDF<SinkFunction<PyObject>> implements SinkFunction<PyObject> { + private static final long serialVersionUID = -9030596504893036458L; - private final byte[] serFun; - private transient SinkFunction<PyObject> fun; - private transient RuntimeContext context; public PythonSinkFunction(SinkFunction<PyObject> fun) throws IOException { - this.serFun = SerializationUtils.serializeObject(fun); - } - - @Override - @SuppressWarnings("unchecked") - public void open(Configuration parameters) throws Exception { - this.fun = (SinkFunction<PyObject>) UtilityFunctions.smartFunctionDeserialization( - getRuntimeContext(), this.serFun); - if (this.fun instanceof RichFunction) { - final RichSinkFunction sinkFunction = (RichSinkFunction)this.fun; - sinkFunction.setRuntimeContext(getRuntimeContext()); - sinkFunction.open(parameters); - } + super(fun); } @Override - public void close() throws Exception { - if (this.fun instanceof RichFunction) { - ((RichSinkFunction)this.fun).close(); + public void invoke(PyObject value, Context context) throws Exception { + try { + this.fun.invoke(value, context); + } catch (PyException pe) { + throw createAndLogException(pe); } } - - @Override - public void invoke(PyObject value) throws Exception { - this.fun.invoke(value); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/UtilityFunctions.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/UtilityFunctions.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/UtilityFunctions.java deleted file mode 100644 index 0417a43..0000000 --- a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/UtilityFunctions.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.python.api.functions; - -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; -import org.apache.flink.streaming.python.api.environment.PythonEnvironmentConfig; -import org.apache.flink.streaming.python.util.serialization.SerializationUtils; -import org.python.core.Py; -import org.python.core.PyObject; -import org.apache.flink.api.common.functions.MapFunction; -import org.python.core.PySystemState; -import org.python.util.PythonInterpreter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.util.Properties; - -/** - * A collection of utility functions that are used by the python wrappers thin layer over - * the streaming functions. - */ -public class UtilityFunctions { - private static boolean jythonInitialized = false; - private static PythonInterpreter pythonInterpreter; - private static final Logger LOG = LoggerFactory.getLogger(UtilityFunctions.class); - - private UtilityFunctions() { - } - - /** - * A generic map operator that convert any java type to PyObject. It is mainly used to convert elements - * collected from a source functions, to PyObject objects. - * - * @param <IN> Any given java object - */ - public static class SerializerMap<IN> implements MapFunction<IN, PyObject> { - private static final long serialVersionUID = 1582769662549499373L; - - @Override - public PyObject map(IN value) throws Exception { - return UtilityFunctions.adapt(value); - } - } - - /** - * Convert java object to its corresponding PyObject representation. - * - * @param o Java object - * @return PyObject - */ - public static PyObject adapt(Object o) { - if (o instanceof PyObject) { - return (PyObject)o; - } - return Py.java2py(o); - } - - /** - * Deserializes Python UDF functions in a "smart" way. It first tries to extract the function using a common - * java method to deserialize an object. If it fails, it is assumed that the function object definition does not - * exist, and therefore, a Jython interpreter is initialized and the relevant python script is executed (without - * actually executing the 'execute' function). This result from this operation is that all the Python UDF objects - * that are defined in that python script are also defined in the local JVM. Is is now possible to extract - * the function using a common java method to deserialize the java function. - * - * @param runtimeCtx The runtime context of the executed job. - * @param serFun A serialized form of the given Python UDF - * @return An extracted java function - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - public static synchronized Object smartFunctionDeserialization(RuntimeContext runtimeCtx, byte[] serFun) throws IOException, ClassNotFoundException, InterruptedException { - try { - return SerializationUtils.deserializeObject(serFun); - } catch (Exception e) { - String path = runtimeCtx.getDistributedCache().getFile(PythonEnvironmentConfig.FLINK_PYTHON_DC_ID).getAbsolutePath(); - - initPythonInterpreter(path, new String[]{""}); - - PySystemState pySysStat = Py.getSystemState(); - pySysStat.path.add(0, path); - - String scriptFullPath = path + File.separator + PythonEnvironmentConfig.FLINK_PYTHON_PLAN_NAME; - LOG.debug("Execute python script, path=" + scriptFullPath); - pythonInterpreter.execfile(scriptFullPath); - - pySysStat.path.remove(path); - - return SerializationUtils.deserializeObject(serFun); - } - } - - /** - * Initializes the Jython interpreter and executes a python script. - * - * @param scriptFullPath The script full path - * @param args Command line arguments that will be delivered to the executed python script - * @throws IOException - */ - public static void initAndExecPythonScript(File scriptFullPath, String[] args) throws IOException { - initPythonInterpreter(scriptFullPath.getParent(), args); - pythonInterpreter.execfile(scriptFullPath.getAbsolutePath()); - - LOG.debug("Cleaning up temporary folder: " + scriptFullPath.getParent()); - FileSystem fs = FileSystem.get(scriptFullPath.toURI()); - fs.delete(new Path(scriptFullPath.getParent()), true); - } - - private static synchronized void initPythonInterpreter(String pythonPath, String[] args) { - if (!jythonInitialized) { - LOG.debug("Init python interpreter, path=" + pythonPath); - Properties postProperties = new Properties(); - postProperties.put("python.path", pythonPath); - PythonInterpreter.initialize(System.getProperties(), postProperties, args); - - pythonInterpreter = new PythonInterpreter(); - pythonInterpreter.setErr(System.err); - pythonInterpreter.setOut(System.out); - - jythonInitialized = true; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaConsumer09.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaConsumer09.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaConsumer09.java deleted file mode 100644 index e6598a2..0000000 --- a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaConsumer09.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.python.connectors; - -import org.apache.flink.streaming.python.util.serialization.PythonDeserializationSchema; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; - -import java.io.IOException; -import java.util.Properties; - -/** - * Defines a generic implementation of the {@link FlinkKafkaConsumer09} class, by using a type - * parameter of an {@code Object} class. The thin Python streaming layer converts the elements - * that are read from the Kafka feed, to a {@code PyObject} for further handling. - */ -public class PythonFlinkKafkaConsumer09 extends FlinkKafkaConsumer09<Object> { - - public PythonFlinkKafkaConsumer09(String topic, DeserializationSchema<Object> schema, Properties props) throws IOException { - super(topic, new PythonDeserializationSchema(schema), props); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaProducer09.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaProducer09.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaProducer09.java deleted file mode 100644 index b02b893..0000000 --- a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaProducer09.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.python.connectors; - -import org.apache.flink.streaming.python.util.serialization.PythonSerializationSchema; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09; -import org.apache.flink.streaming.util.serialization.SerializationSchema; -import org.python.core.PyObject; - -import java.io.IOException; - -/** - * Defines a generic Python implementation of {@link FlinkKafkaProducer09}, by using a type - * parameter of {@code PyObject} class. It can be used as a sink in the python code. - */ -public class PythonFlinkKafkaProducer09 extends FlinkKafkaProducer09<PyObject> { - - public PythonFlinkKafkaProducer09(String brokerList, String topicId, SerializationSchema<PyObject> serializationSchema) throws IOException { - super(brokerList, topicId, new PythonSerializationSchema(serializationSchema)); - } - - public void set_log_failures_only(boolean logFailuresOnly) { - this.setLogFailuresOnly(logFailuresOnly); - } - - public void set_flush_on_checkpoint(boolean flush) { - this.setFlushOnCheckpoint(flush); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/AdapterMap.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/AdapterMap.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/AdapterMap.java new file mode 100644 index 0000000..3716375 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/AdapterMap.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.python.util; + +import org.apache.flink.api.common.functions.MapFunction; + +import org.python.core.Py; +import org.python.core.PyObject; + +/** + * A generic map operator that convert any java type to PyObject. It is mainly used to convert elements + * collected from a source functions, to PyObject objects. + * + * @param <IN> Any given java object + */ +public class AdapterMap<IN> implements MapFunction<IN, PyObject> { + private static final long serialVersionUID = 1582769662549499373L; + + /** + * Convert java object to its corresponding PyObject representation. + * + * @param o Java object + * @return PyObject + */ + public static PyObject adapt(Object o) { + if (o instanceof PyObject) { + return (PyObject) o; + } + return Py.java2py(o); + } + + @Override + public PyObject map(IN value) throws Exception { + PyObject ret = adapt(value); + return ret; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/InterpreterUtils.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/InterpreterUtils.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/InterpreterUtils.java new file mode 100644 index 0000000..8c3520d --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/InterpreterUtils.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.python.util; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.python.api.environment.PythonConstants; +import org.apache.flink.streaming.python.api.environment.PythonEnvironmentFactory; +import org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment; +import org.apache.flink.streaming.python.util.serialization.SerializationUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.FlinkRuntimeException; + +import org.python.core.PySystemState; +import org.python.util.PythonInterpreter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Paths; +import java.util.Properties; + +/** + * A collection of utility methods for interacting with jython. + * + * <p><strong>Important:</strong> This class loads the jython runtime which is essentially one big singleton. We make + * the core assumption here that this class is loaded by separate ClassLoaders for each job or task allowing multiple + * instances of jython to exist in the same JVM without affecting each other. + */ +public class InterpreterUtils { + private static final Logger LOG = LoggerFactory.getLogger(InterpreterUtils.class); + + private static PythonInterpreter pythonInterpreter = null; + private static boolean jythonInitialized = false; + + private InterpreterUtils() { + } + + /** + * Deserialize the given python function. If the functions class definition cannot be found we assume that this is + * the first invocation of this method for a given job and load the python script containing the class definition + * via jython. + * + * @param context the RuntimeContext of the java function + * @param serFun serialized python UDF + * @return deserialized python UDF + * @throws FlinkException if the deserialization failed + */ + @SuppressWarnings("unchecked") + public static <X> X deserializeFunction(RuntimeContext context, byte[] serFun) throws FlinkException { + if (!jythonInitialized) { + // This branch is only tested by end-to-end tests + String path = context.getDistributedCache().getFile(PythonConstants.FLINK_PYTHON_DC_ID).getAbsolutePath(); + + String scriptName = PythonStreamExecutionEnvironment.PythonJobParameters.getScriptName(context.getExecutionConfig().getGlobalJobParameters()); + + try { + initPythonInterpreter( + new String[]{Paths.get(path, scriptName).toString()}, + path, + scriptName); + } catch (Exception e) { + LOG.error("Initialization of jython failed."); + try { + LOG.error("Initialization of jython failed.", e); + throw new FlinkRuntimeException("Initialization of jython failed.", e); + } catch (Exception ie) { + // this may occur if the initial exception relies on jython being initialized properly + LOG.error("Initialization of jython failed. Could not print original stacktrace.", ie); + throw new FlinkRuntimeException("Initialization of jython failed. Could not print original stacktrace."); + } + } + } + + try { + return (X) SerializationUtils.deserializeObject(serFun); + } catch (IOException | ClassNotFoundException ex) { + throw new FlinkException("Deserialization of user-function failed.", ex); + } + } + + /** + * Initializes the Jython interpreter and executes a python script. + * + * @param factory environment factory + * @param scriptDirectory the directory containing all required user python scripts + * @param scriptName the name of the main python script + * @param args Command line arguments that will be delivered to the executed python script + */ + public static void initAndExecPythonScript(PythonEnvironmentFactory factory, java.nio.file.Path scriptDirectory, String scriptName, String[] args) { + String[] fullArgs = new String[args.length + 1]; + fullArgs[0] = scriptDirectory.resolve(scriptName).toString(); + System.arraycopy(args, 0, fullArgs, 1, args.length); + + initPythonInterpreter(fullArgs, scriptDirectory.toUri().getPath(), scriptName); + + PythonInterpreter pythonInterpreter = initPythonInterpreter(fullArgs, scriptDirectory.toUri().getPath(), scriptName); + + pythonInterpreter.set("__flink_env_factory__", factory); + pythonInterpreter.exec(scriptName + ".main(__flink_env_factory__)"); + } + + private static synchronized PythonInterpreter initPythonInterpreter(String[] args, String pythonPath, String scriptName) { + if (!jythonInitialized) { + // the java stack traces within the jython runtime aren't useful for users + System.getProperties().put("python.options.includeJavaStackInExceptions", "false"); + PySystemState.initialize(System.getProperties(), new Properties(), args); + + pythonInterpreter = new PythonInterpreter(); + + pythonInterpreter.getSystemState().path.add(0, pythonPath); + + pythonInterpreter.setErr(System.err); + pythonInterpreter.setOut(System.out); + + pythonInterpreter.exec("import " + scriptName); + jythonInitialized = true; + } + return pythonInterpreter; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonCollector.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonCollector.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonCollector.java index 4caf668..702795f 100644 --- a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonCollector.java +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonCollector.java @@ -15,11 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.python.util; import org.apache.flink.annotation.Public; -import org.apache.flink.streaming.python.api.functions.UtilityFunctions; import org.apache.flink.util.Collector; + import org.python.core.PyObject; /** @@ -27,7 +28,7 @@ import org.python.core.PyObject; * if necessary, to a {@code PyObject}. */ @Public -public class PythonCollector implements Collector<PyObject> { +public class PythonCollector implements Collector<Object> { private Collector<PyObject> collector; public void setCollector(Collector<PyObject> collector) { @@ -35,8 +36,8 @@ public class PythonCollector implements Collector<PyObject> { } @Override - public void collect(PyObject record) { - PyObject po = UtilityFunctions.adapt(record); + public void collect(Object record) { + PyObject po = AdapterMap.adapt(record); this.collector.collect(po); } http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonIterator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonIterator.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonIterator.java index 55fbd69..f99f6d6 100644 --- a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonIterator.java +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonIterator.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.python.util; import java.io.Serializable; http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyBooleanSerializer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyBooleanSerializer.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyBooleanSerializer.java new file mode 100644 index 0000000..fcc3e91 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyBooleanSerializer.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.python.util.serialization; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.python.core.PyBoolean; + +/** + * A {@link Serializer} implementation for {@link PyBoolean} class type. + */ +public class PyBooleanSerializer extends Serializer<PyBoolean> { + @Override + public void write(Kryo kryo, Output output, PyBoolean object) { + output.writeBoolean(object.getBooleanValue()); + } + + @Override + public PyBoolean read(Kryo kryo, Input input, Class<PyBoolean> type) { + return new PyBoolean(input.readBoolean()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyFloatSerializer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyFloatSerializer.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyFloatSerializer.java new file mode 100644 index 0000000..5672ff6 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyFloatSerializer.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.python.util.serialization; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.python.core.PyFloat; + +/** + * A {@link Serializer} implementation for {@link PyFloat} class type. + */ +public class PyFloatSerializer extends Serializer<PyFloat> { + @Override + public void write(Kryo kryo, Output output, PyFloat object) { + output.writeDouble(object.getValue()); + } + + @Override + public PyFloat read(Kryo kryo, Input input, Class<PyFloat> type) { + return new PyFloat(input.readDouble()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyIntegerSerializer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyIntegerSerializer.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyIntegerSerializer.java new file mode 100644 index 0000000..a19eb83 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyIntegerSerializer.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.python.util.serialization; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.python.core.PyInteger; + +/** + * A {@link Serializer} implementation for {@link PyInteger} class type. + */ +public class PyIntegerSerializer extends Serializer<PyInteger> { + @Override + public void write(Kryo kryo, Output output, PyInteger object) { + output.writeInt(object.getValue()); + } + + @Override + public PyInteger read(Kryo kryo, Input input, Class<PyInteger> type) { + return new PyInteger(input.readInt()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9b7e4292/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyLongSerializer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyLongSerializer.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyLongSerializer.java new file mode 100644 index 0000000..7686a98 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyLongSerializer.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.python.util.serialization; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.python.core.PyLong; + +import java.math.BigInteger; + +/** + * A {@link Serializer} implementation for {@link PyLong} class type. + */ +public class PyLongSerializer extends Serializer<PyLong> { + @Override + public void write(Kryo kryo, Output output, PyLong object) { + byte[] data = object.getValue().toByteArray(); + output.writeShort(data.length); + output.writeBytes(data); + } + + @Override + public PyLong read(Kryo kryo, Input input, Class<PyLong> type) { + int length = input.readShort(); + return new PyLong(new BigInteger(input.readBytes(length))); + } +}
