http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java new file mode 100644 index 0000000..6710bf1 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api.environment; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.python.api.datastream.PythonDataStream; +import org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction; +import org.apache.flink.streaming.python.api.functions.PythonIteratorFunction; +import org.apache.flink.streaming.python.api.functions.UtilityFunctions; +import org.apache.flink.streaming.python.util.serialization.PyObjectSerializer; +import org.python.core.PyObject; +import org.python.core.PyString; +import org.python.core.PyInteger; +import org.python.core.PyLong; +import org.python.core.PyUnicode; +import org.python.core.PyTuple; +import org.python.core.PyObjectDerived; +import org.python.core.PyInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.Iterator; +import java.util.Random; + + +/** + * A thin wrapper layer over {@link StreamExecutionEnvironment}. + * + * <p>The PythonStreamExecutionEnvironment is the context in which a streaming program is executed. + * </p> + * + * <p>The environment provides methods to control the job execution (such as setting the parallelism + * or the fault tolerance/checkpointing parameters) and to interact with the outside world + * (data access).</p> + */ +@PublicEvolving +public class PythonStreamExecutionEnvironment { + private static final Logger LOG = LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class); + private final StreamExecutionEnvironment env; + + /** + * A thin wrapper layer over {@link StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes + * care for required Jython serializers registration. + * + * @return The python execution environment of the context in which the program is + * executed. + */ + public static PythonStreamExecutionEnvironment get_execution_environment() { + return new PythonStreamExecutionEnvironment(StreamExecutionEnvironment.getExecutionEnvironment()); + } + + /** + * Creates a {@link LocalStreamEnvironment}. The local execution environment + * will run the program in a multi-threaded fashion in the same JVM as the + * environment was created in. The default parallelism of the local + * environment is the number of hardware contexts (CPU cores / threads), + * unless it was specified differently by {@link #setParallelism(int)}. + * + * @param configuration + * Pass a custom configuration into the cluster + * @return A local execution environment with the specified parallelism. + */ + public static PythonStreamExecutionEnvironment create_local_execution_environment(Configuration config) { + return new PythonStreamExecutionEnvironment(new LocalStreamEnvironment(config)); + } + + /** + * A thin wrapper layer over {@link StreamExecutionEnvironment#createLocalEnvironment(int, Configuration)} + * + * @param parallelism + * The parallelism for the local environment. + * @param config + * Pass a custom configuration into the cluster + * @return A local python execution environment with the specified parallelism. + */ + public static PythonStreamExecutionEnvironment create_local_execution_environment(int parallelism, Configuration config) { + return new PythonStreamExecutionEnvironment( + StreamExecutionEnvironment.createLocalEnvironment(parallelism, config)); + } + + /** + * A thin wrapper layer over {@link StreamExecutionEnvironment#createRemoteEnvironment(java.lang.String, int, java.lang.String...)} + * + * @param host + * The host name or address of the master (JobManager), where the + * program should be executed. + * @param port + * The port of the master (JobManager), where the program should + * be executed. + * @param jar_files + * The JAR files with code that needs to be shipped to the + * cluster. If the program uses user-defined functions, + * user-defined input formats, or any libraries, those must be + * provided in the JAR files. + * @return A remote environment that executes the program on a cluster. + */ + public static PythonStreamExecutionEnvironment create_remote_execution_environment( + String host, int port, String... jar_files) { + return new PythonStreamExecutionEnvironment( + StreamExecutionEnvironment.createRemoteEnvironment(host, port, jar_files)); + } + + /** + * A thin wrapper layer over {@link StreamExecutionEnvironment#createRemoteEnvironment( + * java.lang.String, int, Configuration, java.lang.String...)} + * + * @param host + * The host name or address of the master (JobManager), where the + * program should be executed. + * @param port + * The port of the master (JobManager), where the program should + * be executed. + * @param config + * The configuration used by the client that connects to the remote cluster. + * @param jar_files + * The JAR files with code that needs to be shipped to the + * cluster. If the program uses user-defined functions, + * user-defined input formats, or any libraries, those must be + * provided in the JAR files. + * @return A remote environment that executes the program on a cluster. + * + */ + public static PythonStreamExecutionEnvironment create_remote_execution_environment( + String host, int port, Configuration config, String... jar_files) { + return new PythonStreamExecutionEnvironment( + StreamExecutionEnvironment.createRemoteEnvironment(host, port, config, jar_files)); + } + + /** + * A thin wrapper layer over {@link StreamExecutionEnvironment#createRemoteEnvironment( + * java.lang.String, int, int, java.lang.String...)} + * + * @param host + * The host name or address of the master (JobManager), where the + * program should be executed. + * @param port + * The port of the master (JobManager), where the program should + * be executed. + * @param parallelism + * The parallelism to use during the execution. + * @param jar_files + * The JAR files with code that needs to be shipped to the + * cluster. If the program uses user-defined functions, + * user-defined input formats, or any libraries, those must be + * provided in the JAR files. + * @return A remote environment that executes the program on a cluster. + */ + public static PythonStreamExecutionEnvironment create_remote_execution_environment( + String host, int port, int parallelism, String... jar_files) { + return new PythonStreamExecutionEnvironment( + StreamExecutionEnvironment.createRemoteEnvironment(host, port, parallelism, jar_files)); + } + + private PythonStreamExecutionEnvironment(StreamExecutionEnvironment env) { + this.env = env; + this.registerJythonSerializers(); + } + + private void registerJythonSerializers() { + this.env.registerTypeWithKryoSerializer(PyString.class, PyObjectSerializer.class); + this.env.registerTypeWithKryoSerializer(PyInteger.class, PyObjectSerializer.class); + this.env.registerTypeWithKryoSerializer(PyLong.class, PyObjectSerializer.class); + this.env.registerTypeWithKryoSerializer(PyUnicode.class, PyObjectSerializer.class); + this.env.registerTypeWithKryoSerializer(PyTuple.class, PyObjectSerializer.class); + this.env.registerTypeWithKryoSerializer(PyObjectDerived.class, PyObjectSerializer.class); + this.env.registerTypeWithKryoSerializer(PyInstance.class, PyObjectSerializer.class); + } + + public PythonDataStream create_python_source(SourceFunction<Object> src) throws Exception { + return new PythonDataStream<>(env.addSource(new PythonGeneratorFunction(src)).map(new UtilityFunctions.SerializerMap<>())); + } + + /** + * Add a java source to the streaming topology. The source expected to be an java based + * implementation (.e.g. Kafka connector). + * + * @param src A native java source (e.g. PythonFlinkKafkaConsumer09) + * @return Python data stream + */ + public PythonDataStream add_java_source(SourceFunction<Object> src) { + return new PythonDataStream<>(env.addSource(src).map(new UtilityFunctions.SerializerMap<>())); + } + + /** + * A thin wrapper layer over {@link StreamExecutionEnvironment#fromElements(java.lang.Object[])} + * + * @param elements + * The array of PyObject elements to create the data stream from. + * @return The data stream representing the given array of elements + */ + public PythonDataStream from_elements(PyObject... elements) { + return new PythonDataStream<>(env.fromElements(elements)); + } + + /** + * A thin wrapper layer over {@link StreamExecutionEnvironment#fromCollection(java.util.Collection)} + * + * <p>The input {@code Collection} is of type {@code Object}, because it is a collection + * of Python elements. * There type is determined in runtime, by the Jython framework.</p> + * + * @param collection + * The collection of python elements to create the data stream from. + * @return + * The data stream representing the given collection + */ + public PythonDataStream from_collection(Collection<Object> collection) { + return new PythonDataStream<>(env.fromCollection(collection).map(new UtilityFunctions.SerializerMap<>())); + } + + /** + * Creates a python data stream from the given iterator. + * + * <p>Note that this operation will result in a non-parallel data stream source, i.e., + * a data stream source with a parallelism of one.</p> + * + * @param iter + * The iterator of elements to create the data stream from + * @return The data stream representing the elements in the iterator + * @see StreamExecutionEnvironment#fromCollection(java.util.Iterator, org.apache.flink.api.common.typeinfo.TypeInformation) + */ + public PythonDataStream from_collection(Iterator<Object> iter) throws Exception { + return new PythonDataStream<>(env.addSource(new PythonIteratorFunction(iter), TypeExtractor.getForClass(Object.class)) + .map(new UtilityFunctions.SerializerMap<>())); + } + + /** + * A thin wrapper layer over {@link StreamExecutionEnvironment#generateSequence(long, long)}. + * + * @param from + * The number to start at (inclusive) + * @param to + * The number to stop at (inclusive) + * @return A python data stream, containing all number in the [from, to] interval + */ + public PythonDataStream generate_sequence(long from, long to) { + return new PythonDataStream<>(env.generateSequence(from, to).map(new UtilityFunctions.SerializerMap<Long>())); + } + + /** + * A thin wrapper layer over {@link StreamExecutionEnvironment#readTextFile(java.lang.String)}. + * + * @param path + * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path"). + * @return The data stream that represents the data read from the given file as text lines + * @throws IOException + */ + + public PythonDataStream read_text_file(String path) throws IOException { + return new PythonDataStream<>(env.readTextFile(path).map(new UtilityFunctions.SerializerMap<String>())); + } + + /** + * A thin wrapper layer over {@link StreamExecutionEnvironment#socketTextStream(java.lang.String, int)}. + * + * @param host + * The host name which a server socket binds + * @param port + * The port number which a server socket binds. A port number of 0 means that the port number is automatically + * allocated. + * @return A python data stream containing the strings received from the socket + */ + public PythonDataStream socket_text_stream(String host, int port) { + return new PythonDataStream<>(env.socketTextStream(host, port).map(new UtilityFunctions.SerializerMap<String>())); + } + + /** + * A thin wrapper layer over {@link StreamExecutionEnvironment#enableCheckpointing(long)}. + * + * @param interval Time interval between state checkpoints in milliseconds. + * @return The same {@code PythonStreamExecutionEnvironment} instance of the caller + */ + public PythonStreamExecutionEnvironment enable_checkpointing(long interval) { + this.env.enableCheckpointing(interval); + return this; + } + + /** + * A thin wrapper layer over {@link StreamExecutionEnvironment#enableCheckpointing(long, CheckpointingMode)}. + * + * @param interval Time interval between state checkpoints in milliseconds. + * @param mode + * The checkpointing mode, selecting between "exactly once" and "at least once" guaranteed. + * @return The same {@code PythonStreamExecutionEnvironment} instance of the caller + */ + public PythonStreamExecutionEnvironment enable_checkpointing(long interval, CheckpointingMode mode) { + this.env.enableCheckpointing(interval, mode); + return this; + } + + /** + * A thin wrapper layer over {@link StreamExecutionEnvironment#setParallelism(int)}. + * + * @param parallelism The parallelism + * @return The same {@code PythonStreamExecutionEnvironment} instance of the caller + */ + public PythonStreamExecutionEnvironment set_parallelism(int parallelism) { + this.env.setParallelism(parallelism); + return this; + } + + /** + * A thin wrapper layer over {@link StreamExecutionEnvironment#execute()}. + * + * @return The result of the job execution + */ + public JobExecutionResult execute() throws Exception { + return execute(false); + } + + /** + * A thin wrapper layer over {@link StreamExecutionEnvironment#execute()}. + * + * <p>In addition, it enables the caller to provide a hint about the execution mode - whether it is local + * or remote. In the case of local execution, the relevant cached files are distributed using the + * local machine temporary folder, otherwise a shared storage medium is used for this purpose.</p> + * + * @return The result of the job execution + */ + public JobExecutionResult execute(Boolean local) throws Exception { + if (PythonEnvironmentConfig.pythonTmpCachePath == null) { + // Nothing to be done! Is is executed on the task manager. + return new JobExecutionResult(null, 0, null); + } + distributeFiles(local); + JobExecutionResult result = this.env.execute(); + cleanupDistributedFiles(); + return result; + } + + /** + * A thin wrapper layer over {@link StreamExecutionEnvironment#execute(java.lang.String)}. + * + * @return The result of the job execution, containing elapsed time and accumulators. + * @throws Exception which occurs during job execution. + */ + public JobExecutionResult execute(String job_name) throws Exception { + return execute(job_name, false); + } + + /** + * A thin wrapper layer over {@link StreamExecutionEnvironment#execute(java.lang.String). + * + * <p>In addition, it enables the caller to provide a hint about the execution mode - whether it is local + * or remote. In the case of local execution, the relevant cached files are distributed using the + * local machine temporary folder, otherwise a shared storage medium is used for this purpose.</p> + * + * @return The result of the job execution, containing elapsed time and accumulators. + * @throws Exception which occurs during job execution. + */ + public JobExecutionResult execute(String job_name, Boolean local) throws Exception { + if (PythonEnvironmentConfig.pythonTmpCachePath == null) { + // Nothing to be done! Is is executed on the task manager. + return new JobExecutionResult(null, 0, null); + } + distributeFiles(local); + JobExecutionResult result = this.env.execute(job_name); + cleanupDistributedFiles(); + return result; + } + + private void distributeFiles(boolean local) throws IOException, URISyntaxException + { + String rootDir; + if (local || this.env instanceof LocalStreamEnvironment) { + rootDir = System.getProperty("java.io.tmpdir"); + } else { + rootDir = "hdfs:///tmp"; + } + PythonEnvironmentConfig.FLINK_HDFS_PATH = Paths.get(rootDir, "flink_cache_" + + (new Random(System.currentTimeMillis())).nextLong()).toString(); + + FileCache.copy(new Path(PythonEnvironmentConfig.pythonTmpCachePath), + new Path(PythonEnvironmentConfig.FLINK_HDFS_PATH), true); + + this.env.registerCachedFile(PythonEnvironmentConfig.FLINK_HDFS_PATH, PythonEnvironmentConfig.FLINK_PYTHON_DC_ID); + } + + private void cleanupDistributedFiles() throws IOException, URISyntaxException { + for (Tuple2<String, DistributedCache.DistributedCacheEntry> e : this.env.getCachedFiles()) { + URI fileUri = new URI(e.f1.filePath); + FileSystem fs = FileSystem.get(fileUri); + LOG.debug(String.format("Cleaning up cached path: %s, uriPath: %s, fileSystem: %s", + e.f1.filePath, + fileUri.getPath(), + fs.getClass().getName())); + fs.delete(new Path(fileUri.getPath()), true); + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PyKey.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PyKey.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PyKey.java new file mode 100644 index 0000000..52ac4ff --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PyKey.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api.functions; + + +/** + * A key type used by {@link org.apache.flink.streaming.python.api.functions.PythonKeySelector} to + * host a python object and provide the necessary interface to compare two python objects. + * It is used internally by the python thin wrapper layer over the streaming data sets. + */ +public class PyKey { + private Object data; + + public PyKey(Object data) { + this.data = data; + } + + + public Object getData() { + return data; + } + + public void setData(Object data) { + this.data = data; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof PyKey)) { + return false; + } + return (((PyKey) other).data.equals(this.data)); + } + + @Override + public int hashCode() { + return data.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonApplyFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonApplyFunction.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonApplyFunction.java new file mode 100644 index 0000000..8fe3245 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonApplyFunction.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api.functions; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.WindowFunction; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.python.util.PythonCollector; +import org.apache.flink.streaming.python.util.serialization.SerializationUtils; +import org.apache.flink.util.Collector; +import org.python.core.PyObject; + +import java.io.IOException; + +/** + * The {@code PythonApplyFunction} is a thin wrapper layer over a Python UDF {@code WindowFunction}. + * It receives an {@code WindowFunction} as an input and keeps it internally in a serialized form. + * It is then delivered, as part of job graph, up to the TaskManager, then it is opened and becomes + * a sort of mediator to the Python UDF {@code WindowFunction}. + * + * <p>This function is used internally by the Python thin wrapper layer over the streaming data + * functionality</p> + */ +public class PythonApplyFunction<W extends Window> extends RichWindowFunction<PyObject, PyObject, PyKey, W> { + private static final long serialVersionUID = 577032239468987781L; + + private final byte[] serFun; + private transient WindowFunction<PyObject, PyObject, Object, Window> fun; + private transient PythonCollector collector; + + public PythonApplyFunction(WindowFunction<PyObject, PyObject, Object, W> fun) throws IOException { + this.serFun = SerializationUtils.serializeObject(fun); + } + + @Override + @SuppressWarnings("unchecked") + public void open(Configuration parameters) throws Exception { + this.fun = + (WindowFunction<PyObject, PyObject, Object, Window>) UtilityFunctions.smartFunctionDeserialization( + getRuntimeContext(), this.serFun); + if (this.fun instanceof RichWindowFunction) { + final RichWindowFunction winFun = (RichWindowFunction)this.fun; + winFun.setRuntimeContext(getRuntimeContext()); + winFun.open(parameters); + } + this.collector = new PythonCollector(); + } + + @Override + public void close() throws Exception { + if (this.fun instanceof RichWindowFunction) { + ((RichWindowFunction)this.fun).close(); + } + } + + @Override + public void apply(PyKey key, W window, Iterable<PyObject> values, Collector<PyObject> out) throws Exception { + this.collector.setCollector(out); + this.fun.apply(key.getData(), window, values, this.collector); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFilterFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFilterFunction.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFilterFunction.java new file mode 100644 index 0000000..345fdcc --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFilterFunction.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api.functions; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichFilterFunction; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.python.util.serialization.SerializationUtils; +import org.python.core.PyObject; + +import java.io.IOException; + + +/** + * The {@code PythonFilterFunction} is a thin wrapper layer over a Python UDF {@code FilterFunction}. + * It receives a {@code FilterFunction} as an input and keeps it internally in a serialized form. + * It is then delivered, as part of job graph, up to the TaskManager, then it is opened and becomes + * a sort of mediator to the Python UDF {@code FilterFunction}. + * + * <p>This function is used internally by the Python thin wrapper layer over the streaming data + * functionality</p> + */ +public class PythonFilterFunction extends RichFilterFunction<PyObject> { + private static final long serialVersionUID = 775688642701399472L; + + private final byte[] serFun; + private transient FilterFunction<PyObject> fun; + + public PythonFilterFunction(FilterFunction<PyObject> fun) throws IOException { + this.serFun = SerializationUtils.serializeObject(fun); + } + + @Override + @SuppressWarnings("unchecked") + public void open(Configuration parameters) throws Exception { + this.fun = (FilterFunction<PyObject>) UtilityFunctions.smartFunctionDeserialization( + getRuntimeContext(), this.serFun); + if (this.fun instanceof RichFunction) { + final RichFilterFunction filterFun = (RichFilterFunction)this.fun; + filterFun.setRuntimeContext(getRuntimeContext()); + filterFun.open(parameters); + } + } + + @Override + public void close() throws Exception { + if (this.fun instanceof RichFunction) { + ((RichFilterFunction)this.fun).close(); + } + } + + @Override + public boolean filter(PyObject value) throws Exception { + return this.fun.filter(value); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFlatMapFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFlatMapFunction.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFlatMapFunction.java new file mode 100644 index 0000000..21554c7 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonFlatMapFunction.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api.functions; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.python.util.PythonCollector; +import org.apache.flink.streaming.python.util.serialization.SerializationUtils; +import org.apache.flink.util.Collector; +import org.python.core.PyObject; + +import java.io.IOException; + + +/** + * The {@code PythonFlatMapFunction} is a thin wrapper layer over a Python UDF {@code FlatMapFunction}. + * It receives a {@code FlatMapFunction} as an input and keeps it internally in a serialized form. + * It is then delivered, as part of the job graph, up to the TaskManager, then it is opened and becomes + * a sort of mediator to the Python UDF {@code FlatMapFunction}. + * + * <p>This function is used internally by the Python thin wrapper layer over the streaming data + * functionality</p> + */ +public class PythonFlatMapFunction extends RichFlatMapFunction<PyObject, PyObject> { + private static final long serialVersionUID = -6098432222172956477L; + + private final byte[] serFun; + private transient FlatMapFunction<PyObject, PyObject> fun; + private transient PythonCollector collector; + + public PythonFlatMapFunction(FlatMapFunction<PyObject, PyObject> fun) throws IOException { + this.serFun = SerializationUtils.serializeObject(fun); + } + + @Override + @SuppressWarnings("unchecked") + public void open(Configuration config) throws Exception { + this.fun = + (FlatMapFunction<PyObject, PyObject>) UtilityFunctions.smartFunctionDeserialization( + getRuntimeContext(), serFun); + if (this.fun instanceof RichFunction) { + final RichFlatMapFunction flatMapFun = (RichFlatMapFunction)this.fun; + flatMapFun.setRuntimeContext(getRuntimeContext()); + flatMapFun.open(config); + } + this.collector = new PythonCollector(); + } + + @Override + public void close() throws Exception { + if (this.fun instanceof RichFunction) { + ((RichFlatMapFunction)this.fun).close(); + } + } + + @Override + public void flatMap(PyObject value, Collector<PyObject> out) throws Exception { + this.collector.setCollector(out); + this.fun.flatMap(value, this.collector); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonGeneratorFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonGeneratorFunction.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonGeneratorFunction.java new file mode 100644 index 0000000..70f212e --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonGeneratorFunction.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api.functions; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.python.util.serialization.SerializationUtils; + +import java.io.IOException; + +/** + * The {@code PythonGeneratorFunction} is a thin wrapper layer over a Python UDF {@code SourceFunction}. + * It receives a {@code SourceFunction} as an input and keeps it internally in a serialized form. + * It is then delivered, as part of the job graph, up to the TaskManager, then it is opened and becomes + * a sort of mediator to the Python UDF {@code SourceFunction}. + * + * <p>This function is used internally by the Python thin wrapper layer over the streaming data + * functionality</p> + */ +public class PythonGeneratorFunction extends RichSourceFunction<Object> { + private static final long serialVersionUID = 3854587935845323082L; + + private final byte[] serFun; + private transient SourceFunction<Object> fun; + + public PythonGeneratorFunction(SourceFunction<Object> fun) throws IOException { + this.serFun = SerializationUtils.serializeObject(fun); + } + + @Override + @SuppressWarnings("unchecked") + public void open(Configuration parameters) throws Exception { + this.fun = (SourceFunction<Object>) UtilityFunctions.smartFunctionDeserialization( + getRuntimeContext(), this.serFun); + } + + @Override + public void close() throws Exception {} + + public void run(SourceContext<Object> ctx) throws Exception { + this.fun.run(ctx); + } + + public void cancel() { + if (this.fun != null) { + this.fun.cancel(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonIteratorFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonIteratorFunction.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonIteratorFunction.java new file mode 100644 index 0000000..38bf175 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonIteratorFunction.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api.functions; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.python.util.serialization.SerializationUtils; + +import java.util.Iterator; +import java.io.IOException; + +/** + * The {@code PythonIteratorFunction} is a thin wrapper layer over a Python UDF {@code Iterator}. + * It receives an {@code Iterator} as an input and keeps it internally in a serialized form. + * It is then delivered, as part of the job graph, up to the TaskManager, then it is opened and becomes + * a sort of mediator to the Python UDF {@code Iterator}. + * + * <p>This function is used internally by the Python thin wrapper layer over the streaming data + * functionality</p> + */ +public class PythonIteratorFunction extends RichSourceFunction<Object> { + private static final long serialVersionUID = 6741748297048588334L; + + private final byte[] serFun; + private transient Iterator<Object> fun; + private transient volatile boolean isRunning; + + public PythonIteratorFunction(Iterator<Object> fun) throws IOException { + this.serFun = SerializationUtils.serializeObject(fun); + } + + @Override + @SuppressWarnings("unchecked") + public void open(Configuration parameters) throws Exception { + this.isRunning = true; + this.fun = (Iterator<Object>) UtilityFunctions.smartFunctionDeserialization(getRuntimeContext(), this.serFun); + } + + @Override + public void close() throws Exception { + } + + @Override + public void run(SourceContext<Object> ctx) throws Exception { + while (isRunning && this.fun.hasNext()) { + ctx.collect(this.fun.next()); + } + } + + @Override + public void cancel() { + isRunning = false; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonKeySelector.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonKeySelector.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonKeySelector.java new file mode 100644 index 0000000..7cdaacb --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonKeySelector.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api.functions; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.python.util.serialization.SerializationUtils; +import org.python.core.PyObject; + +import java.io.IOException; + +/** + * The {@code PythonKeySelector} is a thin wrapper layer over a Python UDF {@code KeySelector}. + * It receives a {@code KeySelector} as an input and keeps it internally in a serialized form. + * It is then delivered, as part of the job graph, up to the TaskManager, then it is opened and becomes + * a sort of mediator to the Python UDF {@code KeySelector}. + * + * <p>This function is used internally by the Python thin wrapper layer over the streaming data + * functionality</p> + */ +public class PythonKeySelector implements KeySelector<PyObject, PyKey> { + private static final long serialVersionUID = 7403775239671366607L; + private final byte[] serFun; + private transient KeySelector<PyObject, Object> fun; + + public PythonKeySelector(KeySelector<PyObject, PyKey> fun) throws IOException { + this.serFun = SerializationUtils.serializeObject(fun); + } + + @Override + @SuppressWarnings("unchecked") + public PyKey getKey(PyObject value) throws Exception { + if (fun == null) { + fun = (KeySelector<PyObject, Object>) SerializationUtils.deserializeObject(serFun); + } + Object key = fun.getKey(value); + return new PyKey(key); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonMapFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonMapFunction.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonMapFunction.java new file mode 100644 index 0000000..24d19da --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonMapFunction.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api.functions; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.python.util.serialization.SerializationUtils; +import org.python.core.PyObject; + +import java.io.IOException; + +/** + * The {@code PythonMapFunction} is a thin wrapper layer over a Python UDF {@code MapFunction}. + * It receives a {@code MapFunction} as an input and keeps it internally in a serialized form. + * It is then delivered, as part of the job graph, up to the TaskManager, then it is opened and becomes + * a sort of mediator to the Python UDF {@code MapFunction}. + * + * <p>This function is used internally by the Python thin wrapper layer over the streaming data + * functionality</p> + */ +public class PythonMapFunction extends RichMapFunction<PyObject, PyObject> { + private static final long serialVersionUID = 3001212087036451818L; + private final byte[] serFun; + private transient MapFunction<PyObject, PyObject> fun; + + public PythonMapFunction(MapFunction<PyObject, PyObject> fun) throws IOException { + this.serFun = SerializationUtils.serializeObject(fun); + } + + @Override + @SuppressWarnings("unchecked") + public void open(Configuration config) throws Exception { + this.fun = (MapFunction<PyObject, PyObject>) UtilityFunctions.smartFunctionDeserialization( + getRuntimeContext(), serFun); + if (this.fun instanceof RichFunction) { + final RichMapFunction mapFun = (RichMapFunction)this.fun; + mapFun.setRuntimeContext(getRuntimeContext()); + mapFun.open(config); + } + } + + @Override + public void close() throws Exception { + if (this.fun instanceof RichFunction) { + ((RichMapFunction)this.fun).close(); + } + } + + @Override + public PyObject map(PyObject value) throws Exception { + return UtilityFunctions.adapt(this.fun.map(value)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonOutputSelector.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonOutputSelector.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonOutputSelector.java new file mode 100644 index 0000000..f022322 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonOutputSelector.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api.functions; + +import org.apache.flink.streaming.api.collector.selector.OutputSelector; +import org.apache.flink.streaming.python.util.serialization.SerializationUtils; +import org.python.core.PyObject; + +import java.io.IOException; + +/** + * The {@code PythonOutputSelector} is a thin wrapper layer over a Python UDF {@code OutputSelector}. + * It receives an {@code OutputSelector} as an input and keeps it internally in a serialized form. + * It is then delivered, as part of the job graph, up to the TaskManager, then it is opened and becomes + * a sort of mediator to the Python UDF {@code OutputSelector}. + * + * <p>This function is used internally by the Python thin wrapper layer over the streaming data + * functionality</p> + */ +public class PythonOutputSelector implements OutputSelector<PyObject> { + private static final long serialVersionUID = 909266346633598177L; + + private final byte[] serFun; + private transient OutputSelector<PyObject> fun; + + public PythonOutputSelector(OutputSelector<PyObject> fun) throws IOException { + this.serFun = SerializationUtils.serializeObject(fun); + } + + @Override + @SuppressWarnings("unchecked") + public Iterable<String> select(PyObject value) { + if (this.fun == null) { + try { + this.fun = (OutputSelector<PyObject>) SerializationUtils.deserializeObject(this.serFun); + } catch (IOException e) { + e.printStackTrace(); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } + } + if (this.fun == null) { + return null; + } + return this.fun.select(value); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonReduceFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonReduceFunction.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonReduceFunction.java new file mode 100644 index 0000000..d02240c --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonReduceFunction.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api.functions; + +import java.io.IOException; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.streaming.python.util.serialization.SerializationUtils; +import org.python.core.PyObject; + +/** + * The {@code PythonReduceFunction} is a thin wrapper layer over a Python UDF {@code ReduceFunction}. + * It receives a {@code ReduceFunction} as an input and keeps it internally in a serialized form. + * It is then delivered, as part of the job graph, up to the TaskManager, then it is opened and becomes + * a sort of mediator to the Python UDF {@code ReduceFunction}. + * + * <p>This function is used internally by the Python thin wrapper layer over the streaming data + * functionality</p> + */ +public class PythonReduceFunction implements ReduceFunction<PyObject> { + private static final long serialVersionUID = -9070596504893036458L; + + private final byte[] serFun; + private transient ReduceFunction<PyObject> fun; + + public PythonReduceFunction(ReduceFunction<PyObject> fun) throws IOException { + this.serFun = SerializationUtils.serializeObject(fun); + } + + @Override + @SuppressWarnings("unchecked") + public PyObject reduce(PyObject value1, PyObject value2) throws Exception { + if (fun == null) { + fun = (ReduceFunction<PyObject>) SerializationUtils.deserializeObject(serFun); + } + + return UtilityFunctions.adapt(this.fun.reduce(value1, value2)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonSinkFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonSinkFunction.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonSinkFunction.java new file mode 100644 index 0000000..e0351b0 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonSinkFunction.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api.functions; + +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.python.util.serialization.SerializationUtils; +import org.python.core.PyObject; + +import java.io.IOException; + +/** + * The {@code PythonSinkFunction} is a thin wrapper layer over a Python UDF {@code SinkFunction}. + * It receives a {@code SinkFunction} as an input and keeps it internally in a serialized form. + * It is then delivered, as part of the job graph, up to the TaskManager, then it is opened and becomes + * a sort of mediator to the Python UDF {@code SinkFunction}. + * + * <p>This function is used internally by the Python thin wrapper layer over the streaming data + * functionality</p> + */ +public class PythonSinkFunction extends RichSinkFunction<PyObject> { + private static final long serialVersionUID = -9030596504893036458L; + private final byte[] serFun; + private transient SinkFunction<PyObject> fun; + private transient RuntimeContext context; + + public PythonSinkFunction(SinkFunction<PyObject> fun) throws IOException { + this.serFun = SerializationUtils.serializeObject(fun); + } + + @Override + @SuppressWarnings("unchecked") + public void open(Configuration parameters) throws Exception { + this.fun = (SinkFunction<PyObject>) UtilityFunctions.smartFunctionDeserialization( + getRuntimeContext(), this.serFun); + if (this.fun instanceof RichFunction) { + final RichSinkFunction sinkFunction = (RichSinkFunction)this.fun; + sinkFunction.setRuntimeContext(getRuntimeContext()); + sinkFunction.open(parameters); + } + } + + @Override + public void close() throws Exception { + if (this.fun instanceof RichFunction) { + ((RichSinkFunction)this.fun).close(); + } + } + + @Override + public void invoke(PyObject value) throws Exception { + this.fun.invoke(value); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/UtilityFunctions.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/UtilityFunctions.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/UtilityFunctions.java new file mode 100644 index 0000000..0417a43 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/UtilityFunctions.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api.functions; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.python.api.environment.PythonEnvironmentConfig; +import org.apache.flink.streaming.python.util.serialization.SerializationUtils; +import org.python.core.Py; +import org.python.core.PyObject; +import org.apache.flink.api.common.functions.MapFunction; +import org.python.core.PySystemState; +import org.python.util.PythonInterpreter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Properties; + +/** + * A collection of utility functions that are used by the python wrappers thin layer over + * the streaming functions. + */ +public class UtilityFunctions { + private static boolean jythonInitialized = false; + private static PythonInterpreter pythonInterpreter; + private static final Logger LOG = LoggerFactory.getLogger(UtilityFunctions.class); + + private UtilityFunctions() { + } + + /** + * A generic map operator that convert any java type to PyObject. It is mainly used to convert elements + * collected from a source functions, to PyObject objects. + * + * @param <IN> Any given java object + */ + public static class SerializerMap<IN> implements MapFunction<IN, PyObject> { + private static final long serialVersionUID = 1582769662549499373L; + + @Override + public PyObject map(IN value) throws Exception { + return UtilityFunctions.adapt(value); + } + } + + /** + * Convert java object to its corresponding PyObject representation. + * + * @param o Java object + * @return PyObject + */ + public static PyObject adapt(Object o) { + if (o instanceof PyObject) { + return (PyObject)o; + } + return Py.java2py(o); + } + + /** + * Deserializes Python UDF functions in a "smart" way. It first tries to extract the function using a common + * java method to deserialize an object. If it fails, it is assumed that the function object definition does not + * exist, and therefore, a Jython interpreter is initialized and the relevant python script is executed (without + * actually executing the 'execute' function). This result from this operation is that all the Python UDF objects + * that are defined in that python script are also defined in the local JVM. Is is now possible to extract + * the function using a common java method to deserialize the java function. + * + * @param runtimeCtx The runtime context of the executed job. + * @param serFun A serialized form of the given Python UDF + * @return An extracted java function + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + public static synchronized Object smartFunctionDeserialization(RuntimeContext runtimeCtx, byte[] serFun) throws IOException, ClassNotFoundException, InterruptedException { + try { + return SerializationUtils.deserializeObject(serFun); + } catch (Exception e) { + String path = runtimeCtx.getDistributedCache().getFile(PythonEnvironmentConfig.FLINK_PYTHON_DC_ID).getAbsolutePath(); + + initPythonInterpreter(path, new String[]{""}); + + PySystemState pySysStat = Py.getSystemState(); + pySysStat.path.add(0, path); + + String scriptFullPath = path + File.separator + PythonEnvironmentConfig.FLINK_PYTHON_PLAN_NAME; + LOG.debug("Execute python script, path=" + scriptFullPath); + pythonInterpreter.execfile(scriptFullPath); + + pySysStat.path.remove(path); + + return SerializationUtils.deserializeObject(serFun); + } + } + + /** + * Initializes the Jython interpreter and executes a python script. + * + * @param scriptFullPath The script full path + * @param args Command line arguments that will be delivered to the executed python script + * @throws IOException + */ + public static void initAndExecPythonScript(File scriptFullPath, String[] args) throws IOException { + initPythonInterpreter(scriptFullPath.getParent(), args); + pythonInterpreter.execfile(scriptFullPath.getAbsolutePath()); + + LOG.debug("Cleaning up temporary folder: " + scriptFullPath.getParent()); + FileSystem fs = FileSystem.get(scriptFullPath.toURI()); + fs.delete(new Path(scriptFullPath.getParent()), true); + } + + private static synchronized void initPythonInterpreter(String pythonPath, String[] args) { + if (!jythonInitialized) { + LOG.debug("Init python interpreter, path=" + pythonPath); + Properties postProperties = new Properties(); + postProperties.put("python.path", pythonPath); + PythonInterpreter.initialize(System.getProperties(), postProperties, args); + + pythonInterpreter = new PythonInterpreter(); + pythonInterpreter.setErr(System.err); + pythonInterpreter.setOut(System.out); + + jythonInitialized = true; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaConsumer09.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaConsumer09.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaConsumer09.java new file mode 100644 index 0000000..e6598a2 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaConsumer09.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.python.connectors; + +import org.apache.flink.streaming.python.util.serialization.PythonDeserializationSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; + +import java.io.IOException; +import java.util.Properties; + +/** + * Defines a generic implementation of the {@link FlinkKafkaConsumer09} class, by using a type + * parameter of an {@code Object} class. The thin Python streaming layer converts the elements + * that are read from the Kafka feed, to a {@code PyObject} for further handling. + */ +public class PythonFlinkKafkaConsumer09 extends FlinkKafkaConsumer09<Object> { + + public PythonFlinkKafkaConsumer09(String topic, DeserializationSchema<Object> schema, Properties props) throws IOException { + super(topic, new PythonDeserializationSchema(schema), props); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaProducer09.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaProducer09.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaProducer09.java new file mode 100644 index 0000000..b02b893 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/connectors/PythonFlinkKafkaProducer09.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.connectors; + +import org.apache.flink.streaming.python.util.serialization.PythonSerializationSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.python.core.PyObject; + +import java.io.IOException; + +/** + * Defines a generic Python implementation of {@link FlinkKafkaProducer09}, by using a type + * parameter of {@code PyObject} class. It can be used as a sink in the python code. + */ +public class PythonFlinkKafkaProducer09 extends FlinkKafkaProducer09<PyObject> { + + public PythonFlinkKafkaProducer09(String brokerList, String topicId, SerializationSchema<PyObject> serializationSchema) throws IOException { + super(brokerList, topicId, new PythonSerializationSchema(serializationSchema)); + } + + public void set_log_failures_only(boolean logFailuresOnly) { + this.setLogFailuresOnly(logFailuresOnly); + } + + public void set_flush_on_checkpoint(boolean flush) { + this.setFlushOnCheckpoint(flush); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonCollector.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonCollector.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonCollector.java new file mode 100644 index 0000000..4caf668 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonCollector.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.util; + +import org.apache.flink.annotation.Public; +import org.apache.flink.streaming.python.api.functions.UtilityFunctions; +import org.apache.flink.util.Collector; +import org.python.core.PyObject; + +/** + * Collects a {@code PyObject} record and forwards it. It makes sure that the record is converted, + * if necessary, to a {@code PyObject}. + */ +@Public +public class PythonCollector implements Collector<PyObject> { + private Collector<PyObject> collector; + + public void setCollector(Collector<PyObject> collector) { + this.collector = collector; + } + + @Override + public void collect(PyObject record) { + PyObject po = UtilityFunctions.adapt(record); + this.collector.collect(po); + } + + @Override + public void close() { + this.collector.close(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonIterator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonIterator.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonIterator.java new file mode 100644 index 0000000..55fbd69 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonIterator.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.util; + +import java.io.Serializable; +import java.util.Iterator; + +/** + * An interface definition for Python iterator. It is extended by Python user-defined + * functions (UDF), within the Python scripts. + */ +public interface PythonIterator extends Iterator, Serializable { +} http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java new file mode 100644 index 0000000..2c8a257 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.util.serialization; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.python.core.PyObject; + +import java.io.IOException; + +/** + * A serializer implementation for PyObject class type. Is is used by the Kryo serialization + * framework. {@see https://github.com/EsotericSoftware/kryo#serializers} + */ +public class PyObjectSerializer extends Serializer<PyObject> { + + public void write (Kryo kryo, Output output, PyObject po) { + try { + byte[] serPo = SerializationUtils.serializeObject(po); + output.writeInt(serPo.length); + output.write(serPo); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public PyObject read (Kryo kryo, Input input, Class<PyObject> type) { + int len = input.readInt(); + byte[] serPo = new byte[len]; + input.read(serPo); + PyObject po = null; + try { + po = (PyObject) SerializationUtils.deserializeObject(serPo); + } catch (IOException e) { + e.printStackTrace(); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } + return po; + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonDeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonDeserializationSchema.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonDeserializationSchema.java new file mode 100644 index 0000000..a51db89 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonDeserializationSchema.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.util.serialization; + +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; + +import java.io.IOException; + +/** + * A Python deserialization schema, which implements {@link DeserializationSchema}. It converts a + * serialized form of {@code PyObject} into its Java Object representation. + */ +public class PythonDeserializationSchema implements DeserializationSchema<Object> { + private static final long serialVersionUID = -9180596504893036458L; + private final TypeInformation<Object> resultType = TypeInformation.of(new TypeHint<Object>(){}); + + private final byte[] serSchema; + private transient DeserializationSchema<Object> schema; + + public PythonDeserializationSchema(DeserializationSchema<Object> schema) throws IOException { + this.serSchema = SerializationUtils.serializeObject(schema); + } + + @SuppressWarnings("unchecked") + public Object deserialize(byte[] message) throws IOException { + if (this.schema == null) { + try { + this.schema = (DeserializationSchema<Object>) SerializationUtils.deserializeObject(this.serSchema); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } + } + return this.schema.deserialize(message); + } + + @Override + public boolean isEndOfStream(Object nextElement) { + return this.schema.isEndOfStream(nextElement); + } + + @Override + public TypeInformation<Object> getProducedType() { + return resultType; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonSerializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonSerializationSchema.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonSerializationSchema.java new file mode 100644 index 0000000..a2dddaf --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PythonSerializationSchema.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.util.serialization; + +import java.io.IOException; + +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.python.core.PyObject; + +/** + * A Python serialization schema, which implements {@link SerializationSchema}. It converts + * a {@code PyObject} into its serialized form. + */ +public class PythonSerializationSchema implements SerializationSchema<PyObject> { + private static final long serialVersionUID = -9170596504893036458L; + + private final byte[] serSchema; + private transient SerializationSchema<PyObject> schema; + + public PythonSerializationSchema(SerializationSchema<PyObject> schema) throws IOException { + this.serSchema = SerializationUtils.serializeObject(schema); + } + + @Override + @SuppressWarnings("unchecked") + public byte[] serialize(PyObject element) { + if (this.schema == null) { + try { + this.schema = (SerializationSchema<PyObject>)SerializationUtils.deserializeObject(this.serSchema); + } catch (IOException e) { + e.printStackTrace(); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } + } + return this.schema.serialize(element); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/SerializationUtils.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/SerializationUtils.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/SerializationUtils.java new file mode 100644 index 0000000..b7fc2a3 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/SerializationUtils.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.util.serialization; + +import org.apache.flink.streaming.python.api.datastream.PythonObjectInputStream2; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +/** + * A generic serialization utility, which turns java objects into their serialization form as + * byte arrays and vise versa. + */ +public class SerializationUtils { + public static byte[] serializeObject(Object o) throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos)) { + oos.writeObject(o); + oos.flush(); + return baos.toByteArray(); + } + } + + public static Object deserializeObject(byte[] bytes) throws IOException, ClassNotFoundException { + try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes); ObjectInputStream ois = new PythonObjectInputStream2(bais)) { + return ois.readObject(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java b/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java new file mode 100644 index 0000000..5e4ec51 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api; + +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.streaming.python.api.PythonStreamBinder; + +import java.io.File; +import java.io.FileNotFoundException; +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; + +public class PythonStreamBinderTest extends StreamingProgramTestBase { + final private static String defaultPythonScriptName = "run_all_tests.py"; + final private static String flinkPythonRltvPath = "flink-libraries/flink-streaming-python"; + final private static String pathToStreamingTests = "src/test/python/org/apache/flink/streaming/python/api"; + + public PythonStreamBinderTest() { + } + + public static void main(String[] args) throws Exception { + if (args.length == 0) { + args = prepareDefaultArgs(); + } else { + args[0] = findStreamTestFile(args[0]).getAbsolutePath(); + } + PythonStreamBinder.main(args); + } + + @Override + public void testProgram() throws Exception { + this.main(new String[]{}); + } + + private static String[] prepareDefaultArgs() throws Exception { + File testFullPath = findStreamTestFile(defaultPythonScriptName); + List<String> filesInTestPath = getFilesInFolder(testFullPath.getParent()); + + String[] args = new String[filesInTestPath.size() + 1]; + args[0] = testFullPath.getAbsolutePath(); + + for (final ListIterator<String> it = filesInTestPath.listIterator(); it.hasNext();) { + final String p = it.next(); + args[it.previousIndex() + 1] = p; + } + return args; + } + + private static File findStreamTestFile(String name) throws Exception { + if (new File(name).exists()) { + return new File(name); + } + FileSystem fs = FileSystem.getLocalFileSystem(); + String workingDir = fs.getWorkingDirectory().getPath(); + if (!workingDir.endsWith(flinkPythonRltvPath)) { + workingDir += File.separator + flinkPythonRltvPath; + } + FileStatus[] status = fs.listStatus( + new Path( workingDir + File.separator + pathToStreamingTests)); + for (FileStatus f : status) { + String file_name = f.getPath().getName(); + if (file_name.equals(name)) { + return new File(f.getPath().getPath()); + } + } + throw new FileNotFoundException(); + } + + private static List<String> getFilesInFolder(String path) { + List<String> results = new ArrayList<>(); + File[] files = new File(path).listFiles(); + if (files != null) { + for (File file : files) { + if (file.isDirectory() || file.getName().startsWith("test_")) { + results.add("." + File.separator + file.getName()); + } + } + } + return results; + } +}
