[FLINK-5886][py] Add Python Streaming API This closes #3838.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/00284fb8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/00284fb8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/00284fb8 Branch: refs/heads/master Commit: 00284fb8131ecb064a08a2da010d27ae95806744 Parents: 1809e9f Author: Zohar Mizrahi <[email protected]> Authored: Tue Nov 15 13:46:36 2016 +0100 Committer: zentol <[email protected]> Committed: Sat Feb 17 15:59:56 2018 +0100 ---------------------------------------------------------------------- docs/.gitignore | 6 + docs/dev/stream/python.md | 649 +++++++++++++++++++ flink-dist/pom.xml | 7 + flink-dist/src/main/assemblies/bin.xml | 18 +- .../src/main/flink-bin/bin/pyflink-stream.sh | 25 + .../src/main/python/fibonacci.py | 134 ++++ flink-libraries/flink-streaming-python/pom.xml | 104 +++ .../python/api/PythonStreamBinder.java | 196 ++++++ .../python/api/datastream/PythonDataStream.java | 269 ++++++++ .../api/datastream/PythonIterativeStream.java | 57 ++ .../api/datastream/PythonKeyedStream.java | 88 +++ .../datastream/PythonObjectInputStream2.java | 68 ++ .../PythonSingleOutputStreamOperator.java | 37 ++ .../api/datastream/PythonSplitStream.java | 51 ++ .../api/datastream/PythonWindowedStream.java | 69 ++ .../environment/PythonEnvironmentConfig.java | 39 ++ .../PythonStreamExecutionEnvironment.java | 421 ++++++++++++ .../streaming/python/api/functions/PyKey.java | 54 ++ .../api/functions/PythonApplyFunction.java | 77 +++ .../api/functions/PythonFilterFunction.java | 72 ++ .../api/functions/PythonFlatMapFunction.java | 78 +++ .../api/functions/PythonGeneratorFunction.java | 65 ++ .../api/functions/PythonIteratorFunction.java | 69 ++ .../python/api/functions/PythonKeySelector.java | 53 ++ .../python/api/functions/PythonMapFunction.java | 70 ++ .../api/functions/PythonOutputSelector.java | 62 ++ .../api/functions/PythonReduceFunction.java | 53 ++ .../api/functions/PythonSinkFunction.java | 72 ++ .../python/api/functions/UtilityFunctions.java | 143 ++++ .../connectors/PythonFlinkKafkaConsumer09.java | 38 ++ .../connectors/PythonFlinkKafkaProducer09.java | 44 ++ .../streaming/python/util/PythonCollector.java | 47 ++ .../streaming/python/util/PythonIterator.java | 28 + .../util/serialization/PyObjectSerializer.java | 59 ++ .../PythonDeserializationSchema.java | 62 ++ .../PythonSerializationSchema.java | 53 ++ .../util/serialization/SerializationUtils.java | 46 ++ .../python/api/PythonStreamBinderTest.java | 100 +++ .../flink/streaming/python/api/run_all_tests.py | 75 +++ .../flink/streaming/python/api/test_filter.py | 82 +++ .../streaming/python/api/test_flatmap_int.py | 77 +++ .../python/api/test_flatmap_list_int.py | 76 +++ .../streaming/python/api/test_flatmap_string.py | 76 +++ .../python/api/test_from_collection.py | 67 ++ .../streaming/python/api/test_from_elements.py | 67 ++ .../streaming/python/api/test_from_iterator.py | 84 +++ .../python/api/test_generate_sequence.py | 65 ++ .../streaming/python/api/test_iterations.py | 69 ++ .../flink/streaming/python/api/test_kafka09.py | 157 +++++ .../python/api/test_keyed_stream_reduce.py | 69 ++ .../flink/streaming/python/api/test_map.py | 83 +++ .../flink/streaming/python/api/test_map_int.py | 77 +++ .../streaming/python/api/test_read_text_file.py | 82 +++ .../python/api/test_socket_text_stream.py | 95 +++ .../streaming/python/api/test_split_select.py | 80 +++ .../flink/streaming/python/api/test_union.py | 77 +++ .../streaming/python/api/test_user_type.py | 86 +++ .../streaming/python/api/test_window_apply.py | 72 ++ .../streaming/python/api/test_word_count.py | 84 +++ .../streaming/python/api/test_write_as_text.py | 68 ++ .../python/api/test_write_to_socket.py | 106 +++ .../streaming/python/api/utils/__init__.py | 17 + .../streaming/python/api/utils/constants.py | 21 + .../python/api/utils/pygeneratorbase.py | 36 + .../python/api/utils/python_test_base.py | 35 + .../flink/streaming/python/api/utils/utils.py | 47 ++ flink-libraries/pom.xml | 1 + 67 files changed, 5613 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/docs/.gitignore ---------------------------------------------------------------------- diff --git a/docs/.gitignore b/docs/.gitignore new file mode 100644 index 0000000..98b6f6b --- /dev/null +++ b/docs/.gitignore @@ -0,0 +1,6 @@ +.bundle/ +.jekyll-metadata +.rubydeps/ +content/ +ruby2/.bundle/ +ruby2/.rubydeps/ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/docs/dev/stream/python.md ---------------------------------------------------------------------- diff --git a/docs/dev/stream/python.md b/docs/dev/stream/python.md new file mode 100644 index 0000000..02edbfa --- /dev/null +++ b/docs/dev/stream/python.md @@ -0,0 +1,649 @@ +--- +title: "Python Programming Guide (Streaming)" +is_beta: true +nav-title: Python API +nav-parent_id: streaming +nav-pos: 63 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +Analysis streaming programs in Flink are regular programs that implement transformations on +streaming data sets (e.g., filtering, mapping, joining, grouping). The streaming data sets are initially +created from certain sources (e.g., by reading from Apache Kafka, or reading files, or from collections). +Results are returned via sinks, which may for example write the data to (distributed) files, or to +standard output (for example the command line terminal). Flink streaming programs run in a variety +of contexts, standalone, or embedded in other programs. The execution can happen in a local JVM, or +on clusters of many machines. + +In order to create your own Flink streaming program, we encourage you to start with the +[program skeleton](#program-skeleton) and gradually add your own +[transformations](#transformations). The remaining sections act as references for additional +operations and advanced features. + +* This will be replaced by the TOC +{:toc} + +Jython Framework +--------------- +Flink Python streaming API uses Jython framework (see <http://www.jython.org/archive/21/docs/whatis.html>) +to drive the execution of a given script. The Python streaming layer, is actually a thin wrapper layer for the +existing Java streaming APIs. + +#### Constraints +There are two main constraints for using Jython: + +* The latest Python supported version is 2.7 +* It is not straightforward to use Python C extensions + +Streaming Program Example +------------------------- +The following streaming program is a complete, working example of WordCount. You can copy & paste the code +to run it locally (see notes later in this section). It counts the number of each word (case insensitive) +in a stream of sentences, on a window size of 50 milliseconds and prints the results into the standard output. + +{% highlight python %} +from org.apache.flink.streaming.api.functions.source import SourceFunction +from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction +from org.apache.flink.api.java.functions import KeySelector +from org.apache.flink.python.api.jython import PythonStreamExecutionEnvironment +from org.apache.flink.streaming.api.windowing.time.Time import milliseconds + + +class Generator(SourceFunction): + def __init__(self, num_iters): + self._running = True + self._num_iters = num_iters + + def run(self, ctx): + counter = 0 + while self._running and counter < self._num_iters: + ctx.collect('Hello World') + counter += 1 + + def cancel(self): + self._running = False + + +class Tokenizer(FlatMapFunction): + def flatMap(self, value, collector): + for word in value.lower().split(): + collector.collect((1, word)) + + +class Selector(KeySelector): + def getKey(self, input): + return input[1] + + +class Sum(ReduceFunction): + def reduce(self, input1, input2): + count1, word1 = input1 + count2, word2 = input2 + return (count1 + count2, word1) + +def main(): + env = PythonStreamExecutionEnvironment.get_execution_environment() + env.create_python_source(Generator(num_iters=1000)) \ + .flat_map(Tokenizer()) \ + .key_by(Selector()) \ + .time_window(milliseconds(50)) \ + .reduce(Sum()) \ + .print() + env.execute() + + +if __name__ == '__main__': + main() +{% endhighlight %} + +**Notes:** + +- If execution is done on a local cluster, you may replace the last line in the `main()` function + with **`env.execute(True)`** +- Execution on a multi-node cluster requires a shared medium storage, which needs to be configured (.e.g HDFS) + upfront. +- The output from of the given script is directed to the standard output. Consequently, the output + is written to the corresponding worker `.out` file. If the script is executed inside the IntelliJ IDE, + then the output will be displayed in the console tab. + +{% top %} + +Program Skeleton +---------------- +As we already saw in the example, Flink streaming programs look like regular Python programs. +Each program consists of the same basic parts: + +1. A `main()` function definition, without arguments - the program entry point, +2. Obtain an `Environment`, +3. Load/create the initial data, +4. Specify transformations on this data, +5. Specify where to put the results of your computations, and +5. Execute your program. + +We will now give an overview of each of those steps but please refer to the respective sections for +more details. + +The `main()` function is a must and it is used by Flink execution layer to run the +given Python streaming program. + +The `Environment` is the basis for all Flink programs. You can +obtain one using these static methods on class `PythonStreamExecutionEnvironment`: + +{% highlight python %} +get_execution_environment() +{% endhighlight %} + +For specifying data sources the streaming execution environment has several methods. +To just read a text file as a sequence of lines, you can use: + +{% highlight python %} +env = get_execution_environment() +text = env.read_text_file("file:///path/to/file") +{% endhighlight %} + +This will give you a DataStream on which you can then apply transformations. For +more information on data sources and input formats, please refer to +[Data Sources](#data-sources). + +Once you have a DataStream you can apply transformations to create a new +DataStream which you can then write to a file, transform again, or +combine with other DataStreams. You apply transformations by calling +methods on DataStream with your own custom transformation function. For example, +a map transformation looks like this: + +{% highlight python %} +class Doubler(MapFunction): + def map(self, value): + return value * 2 + +data.map(Doubler()) +{% endhighlight %} + +This will create a new DataStream by doubling every value in the original DataStream. +For more information and a list of all the transformations, +please refer to [Transformations](#transformations). + +Once you have a DataStream that needs to be written to disk you can call one +of these methods on DataStream: + +{% highlight python %} +data.write_as_text("<file-path>") +data.write_as_text("<file-path>", mode=WriteMode.OVERWRITE) +data.print() +{% endhighlight %} + +The last method is only useful for developing/debugging on a local machine, +it will output the contents of the DataSet to standard output. (Note that in +a cluster, the result goes to the standard out stream of the cluster nodes and ends +up in the *.out* files of the workers). +The first two do as the name suggests. +Please refer to [Data Sinks](#data-sinks) for more information on writing to files. + +Once you specified the complete program you need to call `execute` on +the `Environment`. This will either execute on your local machine or submit your program +for execution on a cluster, depending on how Flink was started. You can force +a local execution by using `execute(True)`. + +{% top %} + +Project setup +--------------- + +Apart from setting up Flink, no additional work is required. Using Jython to execute the Python +script, means that no external packages are needed and the program is executed as if it was a jar file. + +The Python API was tested on Linux/OSX systems. + +{% top %} + +Lazy Evaluation +--------------- + +All Flink programs are executed lazily: When the program's main method is executed, the data loading +and transformations do not happen directly. Rather, each operation is created and added to the +program's plan. The operations are actually executed when one of the `execute()` methods is invoked +on the Environment object. Whether the program is executed locally or on a cluster depends +on the environment of the program. + +The lazy evaluation lets you construct sophisticated programs that Flink executes as one +holistically planned unit. + +{% top %} + +Transformations +--------------- + +Data transformations transform one or more DataStreams into a new DataStream. Programs can combine +multiple transformations into sophisticated assemblies. + +This section gives a brief overview of the available transformations. The [transformations +documentation](dataset_transformations.html) has a full description of all transformations with +examples. + +<br /> + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Transformation</th> + <th class="text-center">Description</th> + </tr> + </thead> + + <tbody> + <tr> + <td><strong>Map</strong><br>PythonDataStream → PythonDataStream</td> + <td> + <p>Takes one element and produces one element.</p> +{% highlight python %} +class Doubler(MapFunction): + def map(self, value): + return value * 2 + +data_stream.map(Doubler()) +{% endhighlight %} + </td> + </tr> + + <tr> + <td><strong>FlatMap</strong><br>PythonDataStream → PythonDataStream</td> + <td> + <p>Takes one element and produces zero, one, or more elements. </p> +{% highlight python %} +class Tokenizer(FlatMapFunction): + def flatMap(self, word, collector): + collector.collect((1, word)) + +data_stream.flat_map(Tokenizer()) +{% endhighlight %} + </td> + </tr> + + <tr> + <td><strong>Filter</strong><br>PythonDataStream → PythonDataStream</td> + <td> + <p>Evaluates a boolean function for each element and retains those for which the function + returns true.</p> +{% highlight python %} +class GreaterThen1000(FilterFunction): + def filter(self, value): + return value > 1000 + +data_stream.filter(GreaterThen1000()) +{% endhighlight %} + </td> + </tr> + + <tr> + <td><strong>KeyBy</strong><br>PythonDataStream → PythonKeyedStream</td> + <td> + <p>Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. + Internally, this is implemented with hash partitioning. See <a href="/dev/api_concepts#specifying-keys">keys</a> on how to specify keys. + This transformation returns a PythonKeyedDataStream.</p> + {% highlight python %} +class Selector(KeySelector): + def getKey(self, input): + return input[1] # Key by the second element in a tuple + +data_stream.key_by(Selector()) // Key by field "someKey" + {% endhighlight %} + </td> + </tr> + + <tr> + <td><strong>Reduce</strong><br>PythonKeyedStream → PythonDataStream</td> + <td> + <p>A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and + emits the new value.</p> +{% highlight python %} +class Sum(ReduceFunction): + def reduce(self, input1, input2): + count1, val1 = input1 + count2, val2 = input2 + return (count1 + count2, val1) + +data.reduce(Sum()) +{% endhighlight %} + </td> + </tr> + + <tr> + <td><strong>Window</strong><br>PythonKeyedStream → PythonWindowedStream</td> + <td> + <p>Windows can be defined on already partitioned KeyedStreams. Windows group the data in each + key according to some characteristic (e.g., the data that arrived within the last 5 seconds). + See <a href="windows.html">windows</a> for a complete description of windows. + {% highlight python %} +keyed_stream.count_window(10, 5) # Last 10 elements, sliding (jumping) by 5 elements + +keyed_stream.time_window(milliseconds(30)) # Last 30 milliseconds of data + +keted_stream.time_window(milliseconds(100), milliseconds(20)) # Last 100 milliseconds of data, sliding (jumping) by 20 milliseconds + {% endhighlight %} + </p> + </td> + </tr> + + <tr> + <td><strong>Window Apply</strong><br>PythonWindowedStream → PythonDataStream</td> + <td> + <p>Applies a general function to the window as a whole. Below is a function that manually sums + the elements of a window.</p> + {% highlight python %} +class WindowSum(WindowFunction): + def apply(self, key, window, values, collector): + sum = 0 + for value in values: + sum += value[0] + collector.collect((key, sum)) + +windowed_stream.apply(WindowSum()) + {% endhighlight %} + </td> + </tr> + + <tr> + <td><strong>Window Reduce</strong><br>PythonWindowedStream → PythonDataStream</td> + <td> + <p>Applies a functional reduce function to the window and returns the reduced value.</p> + {% highlight python %} +class Sum(ReduceFunction): + def reduce(self, input1, input2): + count1, val1 = input1 + count2, val2 = input2 + return (count1 + count2, val1) + +windowed_stream.reduce(Sum()) + {% endhighlight %} + </td> + </tr> + + <tr> + <td><strong>Union</strong><br>PythonDataStream* → PythonDataStream</td> + <td> + <p>Union of two or more data streams creating a new stream containing all the elements from all + the streams. Note: If you union a data stream with itself you will get each element twice + in the resulting stream.</p> + {% highlight python %} +data_stream.union(other_stream1, other_stream2, ...); + {% endhighlight %} + </td> + </tr> + + <tr> + <td><strong>Split</strong><br>PythonDataStream → PythonSplitStream</td> + <td> + <p>Split the stream into two or more streams according to some criterion. + {% highlight python %} +class StreamSelector(OutputSelector): + def select(self, value): + return ["even"] if value % 2 == 0 else ["odd"] + +splited_stream = data_stream.split(StreamSelector()) + {% endhighlight %} + </p> + </td> + </tr> + + <tr> + <td><strong>Select</strong><br>SplitStream → DataStream</td> + <td> + <p> Select one or more streams from a split stream. + {% highlight python %} +even_data_stream = splited_stream.select("even") +odd_data_stream = splited_stream.select("odd") +all_data_stream = splited_stream.select("even", "odd") + {% endhighlight %} + </p> + </td> + </tr> + + <tr> + <td><strong>Iterate</strong><br>PythonDataStream → PythonIterativeStream → PythonDataStream</td> + <td> + <p> Creates a "feedback" loop in the flow, by redirecting the output of one operator + to some previous operator. This is especially useful for defining algorithms that + continuously update a model. The following code starts with a stream and applies + the iteration body continuously. Elements that are greater than 0 are sent back + to the feedback channel, and the rest of the elements are forwarded downstream. + See <a href="#iterations">iterations</a> for a complete description. + {% highlight java %} +class MinusOne(MapFunction): + def map(self, value): + return value - 1 + +class PositiveNumber(FilterFunction): + def filter(self, value): + return value > 0 + +class LessEquelToZero(FilterFunction): + def filter(self, value): + return value <= 0 + +iteration = initial_stream.iterate(5000) +iteration_body = iteration.map(MinusOne()) +feedback = iteration_body.filter(PositiveNumber()) +iteration.close_with(feedback) +output = iteration_body.filter(LessEquelToZero()) + {% endhighlight %} + </p> + </td> + </tr> + + </tbody> +</table> + +{% top %} + +Passing Functions to Flink +-------------------------- + +Certain operations require user-defined functions as arguments. All the functions should be +defined as Python classes that derived from the relevant Flink function. User-defined functions +are serialized and sent over to the TaskManagers for execution. + +{% highlight python %} +class Filter(FilterFunction): + def filter(self, value): + return value > 5 + +data_stream.filter(Filter()) +{% endhighlight %} + +Rich functions (.e.g `RichFilterFunction`) enable to define (override) the optional operations: `open` & `close`. +The user may use these functions for initialization and cleanups. + +{% highlight python %} +class Tockenizer(RichMapFunction): + def open(self, config): + pass + def close(self): + pass + def map(self, value): + pass + +data_stream.map(Tockenizer()) +{% endhighlight %} + +The `open` function is called by the worker before starting the streaming pipeline. +The `close` function is called by the worker after the streaming pipeline is stopped. + +{% top %} + +Data Types +---------- + +Flink's Python Streaming API offers support for primitive Python types (int, float, bool, +string), as well as byte arrays and user-defined classes. + +{% highlight python %} +class Person: + def __init__(self, name, age): + self.name = name + self.age = age + +class Tokenizer(MapFunction): + def map(self, value): + return (1, Person(*value)) + +data_stream.map(Tokenizer()) +{% endhighlight %} + +#### Tuples/Lists + +You can use the tuples (or lists) for composite types. Python tuples are mapped to Jython native corresponding +types, which are handled by the Python wrapper thin layer. + +{% highlight python %} +word_counts = env.from_elements(("hello", 1), ("world",2)) + +class Tokenizer(MapFunction): + def map(self, value): + return value[1] + +counts = word_counts.map(Tokenizer()) +{% endhighlight %} + +{% top %} + +Data Sources +------------ + +Data sources create the initial data streams, such as from files or from collections. + +File-based: + +- `read_text_file(path)` - reads files line wise and returns them as a stream of Strings. + +Collection-based: + +- `from_elements(*args)` - Creates a data stream from all the elements. +- `generate_sequence(from, to)` - Generates the sequence of numbers in the given interval, in parallel. + +**Examples** + +{% highlight python %} +env = PythonStreamExecutionEnvironment.get_execution_environment() + +\# read text file from local files system +localLiens = env.read_text("file:///path/to/my/textfile") + +\# read text file from a HDFS running at nnHost:nnPort +hdfsLines = env.read_text("hdfs://nnHost:nnPort/path/to/my/textfile") + +\# create a set from some given elements +values = env.from_elements("Foo", "bar", "foobar", "fubar") + +\# generate a number sequence +numbers = env.generate_sequence(1, 10000000) +{% endhighlight %} + +{% top %} + +Data Sinks +---------- + +Data sinks consume DataStreams and are used to store or return them: + +- `write_as_text()` - Writes elements line-wise as Strings. The Strings are + obtained by calling the *str()* method of each element. +- `print()` - Prints the *str()* value of each element on the + standard out. +- `write_to_socket()` - Writes the DataStream to a socket [host:port] as a byte array. + +A DataStream can be input to multiple operations. Programs can write or print a data stream and at the +same time run additional transformations on them. + +**Examples** + +Standard data sink methods: + +{% highlight scala %} + write DataStream to a file on the local file system +textData.write_as_text("file:///my/result/on/localFS") + + write DataStream to a file on a HDFS with a namenode running at nnHost:nnPort +textData.write_as_text("hdfs://nnHost:nnPort/my/result/on/localFS") + + write DataStream to a file and overwrite the file if it exists +textData.write_as_text("file:///my/result/on/localFS", WriteMode.OVERWRITE) + + this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines +values.write_as_text("file:///path/to/the/result/file") +{% endhighlight %} + +{% top %} + +Parallel Execution +------------------ + +This section describes how the parallel execution of programs can be configured in Flink. A Flink +program consists of multiple tasks (operators, data sources, and sinks). A task is split into +several parallel instances for execution and each parallel instance processes a subset of the task's +input data. The number of parallel instances of a task is called its *parallelism* or *degree of +parallelism (DOP)*. + +The degree of parallelism of a task can be specified in Flink on different levels. + +### Execution Environment Level + +Flink programs are executed in the context of an [execution environment](#program-skeleton). An +execution environment defines a default parallelism for all operators, data sources, and data sinks +it executes. Execution environment parallelism can be overwritten by explicitly configuring the +parallelism of an operator. + +The default parallelism of an execution environment can be specified by calling the +`set_parallelism()` method. To execute all operators, data sources, and data sinks of the +[WordCount](#example-program) example program with a parallelism of `3`, set the default parallelism of the +execution environment as follows: + +{% highlight python %} +env = PythonStreamExecutionEnvironment.get_execution_environment() +env.set_parallelism(3) + +text.flat_map(Tokenizer()) \ + .key_by(Selector()) \ + .time_window(milliseconds(30)) \ + .reduce(Sum()) \ + .print() + +env.execute() +{% endhighlight %} + +### System Level + +A system-wide default parallelism for all execution environments can be defined by setting the +`parallelism.default` property in `./conf/flink-conf.yaml`. See the +[Configuration]({{ site.baseurl }}/setup/config.html) documentation for details. + +{% top %} + +Executing Plans +--------------- + +To run the plan with Flink, go to your Flink distribution, and run the pyflink-stream.sh script from the /bin +folder. The script containing the plan has to be passed as the first argument, followed by a number of additional +Python packages, and finally, separated by `-`additional arguments that will be fed to the script. + +{% highlight python %} +./bin/pyflink-stream.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - <param1>[ <paramX>]] +{% endhighlight %} + +{% top %} http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-dist/pom.xml ---------------------------------------------------------------------- diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml index daaaa82..c555f66 100644 --- a/flink-dist/pom.xml +++ b/flink-dist/pom.xml @@ -93,6 +93,13 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-python_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-scala-shell_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-dist/src/main/assemblies/bin.xml ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml index 993e495..059d7be 100644 --- a/flink-dist/src/main/assemblies/bin.xml +++ b/flink-dist/src/main/assemblies/bin.xml @@ -28,7 +28,7 @@ under the License. <includeBaseDirectory>true</includeBaseDirectory> <baseDirectory>flink-${project.version}</baseDirectory> - <!-- Include flink-python.jar in lib/ --> + <!-- Include flink-python.jar & flink-streaming-python.jar in lib/ --> <dependencySets> <dependencySet> <outputDirectory>lib</outputDirectory> @@ -208,12 +208,28 @@ under the License. </includes> </fileSet> + <!-- copy python jar --> + <fileSet> + <directory>../flink-libraries/flink-streaming-python/target</directory> + <outputDirectory>lib</outputDirectory> + <fileMode>0644</fileMode> + <includes> + <include>flink-python_${scala.binary.version}-${project.version}.jar</include> + </includes> + </fileSet> + <!-- copy python example to examples of dist --> <fileSet> <directory>../flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example</directory> <outputDirectory>examples/python</outputDirectory> <fileMode>0755</fileMode> </fileSet> + <!-- copy python streaming example to examples of dist --> + <fileSet> + <directory>../flink-examples/flink-examples-streaming/src/main/python</directory> + <outputDirectory>examples/streaming/python</outputDirectory> + <fileMode>0755</fileMode> + </fileSet> </fileSets> http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-dist/src/main/flink-bin/bin/pyflink-stream.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/bin/pyflink-stream.sh b/flink-dist/src/main/flink-bin/bin/pyflink-stream.sh new file mode 100644 index 0000000..c31f702 --- /dev/null +++ b/flink-dist/src/main/flink-bin/bin/pyflink-stream.sh @@ -0,0 +1,25 @@ +#!/bin/bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +"$FLINK_BIN_DIR"/flink run --class org.apache.flink.streaming.python.api.PythonStreamBinder -v "$FLINK_ROOT_DIR"/lib/flink-streaming-python*.jar "$@" http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-examples/flink-examples-streaming/src/main/python/fibonacci.py ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/python/fibonacci.py b/flink-examples/flink-examples-streaming/src/main/python/fibonacci.py new file mode 100644 index 0000000..96f90c8 --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/python/fibonacci.py @@ -0,0 +1,134 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# +# An example that illustrates iterations in Flink streaming. The program sums up random numbers and counts +# additions. The operation is done until it reaches a specific threshold in an iterative streaming fashion. +# + +import random +import sys +from org.apache.flink.api.common.functions import MapFunction +from org.apache.flink.streaming.api.collector.selector import OutputSelector +from org.apache.flink.streaming.api.functions.source import SourceFunction +from org.apache.flink.api.java.utils import ParameterTool +from org.apache.flink.streaming.python.api.environment import PythonStreamExecutionEnvironment + +TARGET_VAL = 100 +MAX_INT_START = 50 + + +class Generator(SourceFunction): + def __init__(self, num_iters=1000): + self._running = True + self._num_iters = num_iters + + def run(self, ctx): + counter = 0 + while self._running and counter < self._num_iters: + self.do(ctx) + counter += 1 + + def do(self, ctx): + two_numbers = "{}, {}".format(random.randrange(1, MAX_INT_START), random.randrange(1, MAX_INT_START)) + ctx.collect(two_numbers) + + def cancel(self): + self._running = False + + +class Fib(MapFunction): + def map(self, value): + return (value[0], value[1], value[3], value[2] + value[3], value[4] + 1) + + +class InPut(MapFunction): + def map(self, value): + num1, num2 = value.split(",") + + num1 = int(num1.strip()) + num2 = int(num2.strip()) + + return (num1, num2, num1, num2, 0) + + +class OutPut(MapFunction): + def map(self, value): + return ((value[0], value[1]), value[4]) + + +class StreamSelector(OutputSelector): + def select(self, value): + return ["iterate"] if value[3] < TARGET_VAL else ["output"] + + +class Main: + def __init__(self, args): + self._args = args + + def run(self): + _params = ParameterTool.fromArgs(self._args) + + env = PythonStreamExecutionEnvironment.get_execution_environment() + + # create input stream of integer pairs + if "--input" in self._args: + try: + file_path = _params.get("input") + input_stream = env.read_text_file(file_path) + except Exception as e: + print(e) + print ("Error in reading input file. Exiting...") + sys.exit(5) + else: + input_stream = env.create_python_source(Generator(num_iters=50)) + + # create an iterative data stream from the input with 5 second timeout + it = input_stream.map(InPut()).iterate(5000) + + # apply the step function to get the next Fibonacci number + # increment the counter and split the output with the output selector + step = it.map(Fib()).split(StreamSelector()) + it.close_with(step.select("iterate")) + + # to produce the final output select the tuples directed to the + # 'output' channel then get the input pairs that have the greatest iteration counter + # on a 1 second sliding window + output = step.select("output") + parsed_output = output.map(OutPut()) + if "--output" in self._args: + try: + file_path = _params.get("output") + parsed_output.write_as_text(file_path) + except Execption as e: + print (e) + print ("Error in writing to output file. will print instead.") + parsed_output.print() + else: + parsed_output.print() + result = env.execute("Fibonacci Example (py)", True if _params.has("local") else False) + print("Fibonacci job completed, job_id={}".format(result.jobID)) + + +def main(): + argv = sys.argv[1:] if len(sys.argv) > 1 else [] + Main(argv).run() + + +if __name__ == '__main__': + main() http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/pom.xml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/pom.xml b/flink-libraries/flink-streaming-python/pom.xml new file mode 100644 index 0000000..68e205a --- /dev/null +++ b/flink-libraries/flink-streaming-python/pom.xml @@ -0,0 +1,104 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-libraries</artifactId> + <version>1.4-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-streaming-python_${scala.binary.version}</artifactId> + <name>flink-streaming-python</name> + <packaging>jar</packaging> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + <archive> + <manifest> + <addClasspath>true</addClasspath> + <mainClass>org.apache.flink.streaming.python.api.PythonStreamBinder</mainClass> + </manifest> + </archive> + </configuration> + </plugin> + </plugins> + </build> + + <dependencies> + + <!-- core dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.python</groupId> + <artifactId>jython-standalone</artifactId> + <version>2.7.0</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka-0.9_2.10</artifactId> + <version>1.2.0</version> + <scope>provided</scope> + </dependency> + + <!-- test dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/PythonStreamBinder.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/PythonStreamBinder.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/PythonStreamBinder.java new file mode 100644 index 0000000..ad5f432 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/PythonStreamBinder.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api; +import org.apache.commons.io.FilenameUtils; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.streaming.python.api.environment.PythonEnvironmentConfig; +import org.apache.flink.streaming.python.api.functions.UtilityFunctions; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.PrintWriter; +import java.io.IOException; + +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Arrays; +import java.util.Random; + +/** + * Allows the execution of Flink stream plan that is written in Python + */ +public class PythonStreamBinder { + private static final Random r = new Random(System.currentTimeMillis()); + public static final String FLINK_PYTHON_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "flink_streaming_plan_"; + + + private PythonStreamBinder() { + } + + /** + * Entry point for the execution of a python streaming task. + * + * @param args <pathToScript> [<pathToPackage1> .. [<pathToPackageX]] - [parameter1]..[parameterX] + * @throws Exception + */ + public static void main(String[] args) throws Exception { + PythonStreamBinder binder = new PythonStreamBinder(); + binder.runPlan(args); + } + + private void runPlan(String[] args) throws Exception { + File script; + if (args.length < 1) { + System.out.println("Usage: prog <pathToScript> [<pathToPackage1> .. [<pathToPackageX]] - [parameter1]..[parameterX]"); + return; + } + else { + script = new File(args[0]); + if ((!script.exists()) || (!script.isFile())) + { + throw new FileNotFoundException("Could not find file: " + args[0]); + } + } + + int split = 0; + for (int x = 0; x < args.length; x++) { + if (args[x].compareTo("-") == 0) { + split = x; + } + } + + GlobalConfiguration.loadConfiguration(); + + String tmpPath = FLINK_PYTHON_FILE_PATH + r.nextLong(); + prepareFiles(tmpPath, Arrays.copyOfRange(args, 0, split == 0 ? args.length : split)); + + if (split != 0) { + String[] a = new String[args.length - split]; + a[0] = args[0]; + System.arraycopy(args, split + 1, a, 1, args.length - (split +1)); + args = a; + } else if (args.length > 1) { + args = new String[]{args[0]}; + } + + UtilityFunctions.initAndExecPythonScript(new File( + tmpPath + File.separator + PythonEnvironmentConfig.FLINK_PYTHON_PLAN_NAME), args); + } + + /** + * Prepares a single package from the given python script. This involves in generating a unique main + * python script, which loads the user's provided script for execution. The reason for this is to create + * a different namespace for different script contents. In addition, it copies all relevant files + * to a temporary folder for execution and distribution. + * + * @param tempFilePath The path to copy all the files to + * @param filePaths The user's script and dependent packages/files + * @throws IOException + * @throws URISyntaxException + */ + private void prepareFiles(String tempFilePath, String... filePaths) throws IOException, URISyntaxException, + NoSuchAlgorithmException { + PythonEnvironmentConfig.pythonTmpCachePath = tempFilePath; + + Files.createDirectories(Paths.get(tempFilePath)); + + String dstMainScriptFullPath = tempFilePath + File.separator + FilenameUtils.getBaseName(filePaths[0]) + + calcPythonMd5(filePaths[0]) + ".py"; + + generateAndCopyPlanFile(tempFilePath, dstMainScriptFullPath); + + //main script + copyFile(filePaths[0], tempFilePath, FilenameUtils.getName(dstMainScriptFullPath)); + + String parentDir = new File(filePaths[0]).getParent(); + + //additional files/folders(modules) + for (int x = 1; x < filePaths.length; x++) { + String currentParent = (new File(filePaths[x])).getParent(); + if (currentParent.startsWith(".")) { + filePaths[x] = parentDir + File.separator + filePaths[x]; + } + copyFile(filePaths[x], tempFilePath, null); + } + } + + private void generateAndCopyPlanFile(String dstPath, String mainScriptFullPath) throws IOException { + String moduleName = FilenameUtils.getBaseName(mainScriptFullPath); + + FileWriter fileWriter = new FileWriter(dstPath + File.separator + PythonEnvironmentConfig.FLINK_PYTHON_PLAN_NAME); + PrintWriter printWriter = new PrintWriter(fileWriter); + printWriter.printf("import %s\n\n", moduleName); + printWriter.printf("%s.main()\n", moduleName); + printWriter.close(); + } + + private void copyFile(String path, String target, String name) throws IOException, URISyntaxException { + if (path.endsWith(File.separator)) { + path = path.substring(0, path.length() - 1); + } + String identifier = name == null ? path.substring(path.lastIndexOf(File.separator)) : name; + String tmpFilePath = target + File.separator + identifier; + Path p = new Path(path); + FileCache.copy(p.makeQualified(FileSystem.get(p.toUri())), new Path(tmpFilePath), true); + } + + + /** + * Naive MD5 calculation from the python content of the given script. Spaces, blank lines and comments + * are ignored. + * + * @param filePath the full path of the given python script + * @return the md5 value as a string + * @throws NoSuchAlgorithmException + * @throws IOException + */ + private String calcPythonMd5(String filePath) throws NoSuchAlgorithmException, IOException { + FileReader fileReader = new FileReader(filePath); + BufferedReader bufferedReader = new BufferedReader(fileReader); + + String line; + MessageDigest md = MessageDigest.getInstance("MD5"); + while ((line = bufferedReader.readLine()) != null) { + line = line.trim(); + if (line.isEmpty() || line.startsWith("#")) { + continue; + } + byte[] bytes = line.getBytes(); + md.update(bytes, 0, bytes.length); + } + fileReader.close(); + + byte[] mdBytes = md.digest(); + + //convert the byte to hex format method 1 + StringBuffer sb = new StringBuffer(); + for (int i = 0; i < mdBytes.length; i++) { + sb.append(Integer.toString((mdBytes[i] & 0xff) + 0x100, 16).substring(1)); + } + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonDataStream.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonDataStream.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonDataStream.java new file mode 100644 index 0000000..96feb0b --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonDataStream.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api.datastream; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.core.fs.FileSystem.WriteMode; +import org.apache.flink.streaming.api.collector.selector.OutputSelector; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.python.api.functions.PyKey; +import org.apache.flink.streaming.python.api.functions.PythonFilterFunction; +import org.apache.flink.streaming.python.api.functions.PythonFlatMapFunction; +import org.apache.flink.streaming.python.api.functions.PythonKeySelector; +import org.apache.flink.streaming.python.api.functions.PythonMapFunction; +import org.apache.flink.streaming.python.api.functions.PythonOutputSelector; +import org.apache.flink.streaming.python.api.functions.PythonSinkFunction; +import org.apache.flink.streaming.python.util.serialization.PythonSerializationSchema; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.python.core.PyObject; + +import java.io.IOException; +import java.util.ArrayList; + + +/** + * A {@code PythonDataStream} is a thin wrapper layer over {@link DataStream}, which represents a + * stream of elements of the same type. A {@code PythonDataStream} can be transformed into + * another {@code PythonDataStream} by applying various transformation functions, such as + * <ul> + * <li>{@link PythonDataStream#map} + * <li>{@link PythonDataStream#split} + * </ul> + * + * <p>A thin wrapper layer means that the functionality itself is performed by the + * {@link DataStream}, however instead of working directly with the streaming data sets, + * this layer handles Python wrappers (e.g. {@code PythonDataStream}) to comply with the + * Python standard coding styles.</p> + */ +@PublicEvolving +public class PythonDataStream<D extends DataStream<PyObject>> { + protected final D stream; + + public PythonDataStream(D stream) { + this.stream = stream; + } + + /** + * A thin wrapper layer over {@link DataStream#union(DataStream[])}. + * + * @param streams + * The Python DataStreams to union output with. + * @return The {@link PythonDataStream}. + */ + @SafeVarargs + @SuppressWarnings("unchecked") + public final PythonDataStream union(PythonDataStream... streams) { + ArrayList<DataStream<PyObject>> dsList = new ArrayList<>(); + for (PythonDataStream ps : streams) { + dsList.add(ps.stream); + } + DataStream<PyObject>[] dsArray = new DataStream[dsList.size()]; + return new PythonDataStream(stream.union(dsList.toArray(dsArray))); + } + + /** + * A thin wrapper layer over {@link DataStream#split(OutputSelector)}. + * + * @param output_selector + * The user defined {@link OutputSelector} for directing the tuples. + * @return The {@link PythonSplitStream} + */ + public PythonSplitStream split(OutputSelector<PyObject> output_selector) throws IOException { + return new PythonSplitStream(this.stream.split(new PythonOutputSelector(output_selector))); + } + + /** + * A thin wrapper layer over {@link DataStream#filter(FilterFunction)}. + * + * @param filter + * The FilterFunction that is called for each element of the DataStream. + * @return The filtered {@link PythonDataStream}. + */ + public PythonSingleOutputStreamOperator filter(FilterFunction<PyObject> filter) throws IOException { + return new PythonSingleOutputStreamOperator(stream.filter(new PythonFilterFunction(filter))); + } + + /** + * A thin wrapper layer over {@link DataStream#map(MapFunction)}. + * + * @param mapper + * The MapFunction that is called for each element of the + * DataStream. + * @return The transformed {@link PythonDataStream}. + */ + public PythonDataStream<SingleOutputStreamOperator<PyObject>> map( + MapFunction<PyObject, PyObject> mapper) throws IOException { + return new PythonDataStream<>(stream.map(new PythonMapFunction(mapper))); + } + + /** + * A thin wrapper layer over {@link DataStream#flatMap(FlatMapFunction)}. + * + * @param flat_mapper + * The FlatMapFunction that is called for each element of the + * DataStream + * + * @return The transformed {@link PythonDataStream}. + */ + public PythonDataStream<SingleOutputStreamOperator<PyObject>> flat_map( + FlatMapFunction<PyObject, PyObject> flat_mapper) throws IOException { + return new PythonDataStream<>(stream.flatMap(new PythonFlatMapFunction(flat_mapper))); + } + + /** + * A thin wrapper layer over {@link DataStream#keyBy(KeySelector)}. + * + * @param selector + * The KeySelector to be used for extracting the key for partitioning + * @return The {@link PythonDataStream} with partitioned state (i.e. {@link PythonKeyedStream}) + */ + public PythonKeyedStream key_by(KeySelector<PyObject, PyKey> selector) throws IOException { + return new PythonKeyedStream(stream.keyBy(new PythonKeySelector(selector))); + } + + /** + * A thin wrapper layer over {@link DataStream#print()}. + */ + @PublicEvolving + public void print() { + stream.print(); + } + + /** + * A thin wrapper layer over {@link DataStream#writeAsText(java.lang.String)}. + * + * @param path + * The path pointing to the location the text file is written to. + */ + @PublicEvolving + public void write_as_text(String path) { + stream.writeAsText(path); + } + + /** + * A thin wrapper layer over {@link DataStream#writeAsText(java.lang.String, WriteMode)}. + * + * @param path + * The path pointing to the location the text file is written to + * @param mode + * Controls the behavior for existing files. Options are + * NO_OVERWRITE and OVERWRITE. + */ + @PublicEvolving + public void write_as_text(String path, WriteMode mode) { + stream.writeAsText(path, mode); + } + + /** + * A thin wrapper layer over {@link DataStream#writeToSocket(java.lang.String, int, SerializationSchema)} + * + * @param host + * host of the socket + * @param port + * port of the socket + * @param schema + * schema for serialization + */ + @PublicEvolving + public void write_to_socket(String host, Integer port, SerializationSchema<PyObject> schema) throws IOException { + stream.writeToSocket(host, port, new PythonSerializationSchema(schema)); + } + + /** + * A thin wrapper layer over {@link DataStream#addSink(SinkFunction)} + * + * @param sink_func + * The object containing the sink's invoke function. + */ + @PublicEvolving + public void add_sink(SinkFunction<PyObject> sink_func) throws IOException { + stream.addSink(new PythonSinkFunction(sink_func)); + } + + /** + * A thin wrapper layer over {@link DataStream#iterate()}. + * + * <p>Initiates an iterative part of the program that feeds back data streams. + * The iterative part needs to be closed by calling + * {@link PythonIterativeStream#close_with(PythonDataStream)}. The transformation of + * this IterativeStream will be the iteration head. The data stream + * given to the {@link PythonIterativeStream#close_with(PythonDataStream)} method is + * the data stream that will be fed back and used as the input for the + * iteration head. </p> + * <p> + * A common usage pattern for streaming iterations is to use output + * splitting to send a part of the closing data stream to the head. Refer to + * {@link #split(OutputSelector)} for more information. + * <p> + * The iteration edge will be partitioned the same way as the first input of + * the iteration head unless it is changed in the + * {@link PythonIterativeStream#close_with(PythonDataStream)} call. + * <p> + * By default a PythonDataStream with iteration will never terminate, but the user + * can use the maxWaitTime parameter to set a max waiting time for the + * iteration head. If no data received in the set time, the stream + * terminates. + * + * @return The iterative data stream created. + */ + @PublicEvolving + public PythonIterativeStream iterate() { + return new PythonIterativeStream(this.stream.iterate()); + } + + /** + * A thin wrapper layer over {@link DataStream#iterate(long)}. + * + * <p></p>Initiates an iterative part of the program that feeds back data streams. + * The iterative part needs to be closed by calling + * {@link PythonIterativeStream#close_with(PythonDataStream)}. The transformation of + * this IterativeStream will be the iteration head. The data stream + * given to the {@link PythonIterativeStream#close_with(PythonDataStream)} method is + * the data stream that will be fed back and used as the input for the + * iteration head.</p> + * <p> + * A common usage pattern for streaming iterations is to use output + * splitting to send a part of the closing data stream to the head. Refer to + * {@link #split(OutputSelector)} for more information. + * <p> + * The iteration edge will be partitioned the same way as the first input of + * the iteration head unless it is changed in the + * {@link PythonIterativeStream#close_with(PythonDataStream)} call. + * <p> + * By default a PythonDataStream with iteration will never terminate, but the user + * can use the maxWaitTime parameter to set a max waiting time for the + * iteration head. If no data received in the set time, the stream + * terminates. + * + * @param max_wait_time_ms + * Number of milliseconds to wait between inputs before shutting + * down + * + * @return The iterative data stream created. + */ + @PublicEvolving + public PythonIterativeStream iterate(Long max_wait_time_ms) { + return new PythonIterativeStream(this.stream.iterate(max_wait_time_ms)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonIterativeStream.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonIterativeStream.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonIterativeStream.java new file mode 100644 index 0000000..ced1c24 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonIterativeStream.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api.datastream; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.IterativeStream; +import org.python.core.PyObject; + +/** + * A thin wrapper layer over {@link IterativeStream}. + * + * <p>The python iterative data stream represents the start of an iteration in a + * {@link PythonDataStream}.</p> + */ +@PublicEvolving +public class PythonIterativeStream extends PythonSingleOutputStreamOperator { + + public PythonIterativeStream(IterativeStream<PyObject> iterativeStream) { + super(iterativeStream); + } + + /** + * A thin wrapper layer over {@link IterativeStream#closeWith(org.apache.flink.streaming.api.datastream.DataStream)} + * + * Please note that this function works with {@link PythonDataStream} and thus wherever a DataStream is mentioned in + * the above {@link IterativeStream#closeWith(org.apache.flink.streaming.api.datastream.DataStream)} description, + * the user may regard it as {@link PythonDataStream} . + * + * + * @param feedback_stream + * {@link PythonDataStream} that will be used as input to the iteration + * head. + * + * @return The feedback stream. + * + */ + public PythonDataStream close_with(PythonDataStream<? extends DataStream<PyObject>> feedback_stream) { + ((IterativeStream<PyObject>)this.stream).closeWith(feedback_stream.stream); + return feedback_stream; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonKeyedStream.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonKeyedStream.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonKeyedStream.java new file mode 100644 index 0000000..f259374 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonKeyedStream.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api.datastream; + +import org.apache.flink.annotation.Public; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.python.api.functions.PyKey; +import org.apache.flink.streaming.python.api.functions.PythonReduceFunction; +import org.python.core.PyObject; + +import java.io.IOException; + + +/** + * A thin wrapper layer over {@link KeyedStream}. + * + * <p>A {@code PythonKeyedStream} represents a {@link PythonDataStream} on which operator state is + * partitioned by key using a provided {@link org.apache.flink.api.java.functions.KeySelector;}</p> + */ +@Public +public class PythonKeyedStream extends PythonDataStream<KeyedStream<PyObject, PyKey>> { + + public PythonKeyedStream(KeyedStream<PyObject, PyKey> stream) { + super(stream); + } + + /** + * A thin wrapper layer over {@link KeyedStream#countWindow(long, long)}. + * + * @param size The size of the windows in number of elements. + * @param slide The slide interval in number of elements. + * @return The python windowed stream {@link PythonWindowedStream} + */ + public PythonWindowedStream count_window(long size, long slide) { + return new PythonWindowedStream<GlobalWindow>(this.stream.countWindow(size, slide)); + } + + /** + * A thin wrapper layer over {@link KeyedStream#timeWindow(Time)} + * + * @param size The size of the window. + * @return The python windowed stream {@link PythonWindowedStream} + */ + public PythonWindowedStream time_window(Time size) { + return new PythonWindowedStream<TimeWindow>(this.stream.timeWindow(size)); + } + + /** + * A thin wrapper layer over {@link KeyedStream#timeWindow(Time, Time)} + * + * @param size The size of the window. + * @return The python wrapper {@link PythonWindowedStream} + */ + public PythonWindowedStream time_window(Time size, Time slide) { + return new PythonWindowedStream<TimeWindow>(this.stream.timeWindow(size, slide)); + } + + /** + * A thin wrapper layer over {@link KeyedStream#reduce(ReduceFunction)} + * + * @param reducer + * The {@link ReduceFunction} that will be called for every + * element of the input values with the same key. + * @return The transformed data stream @{link PythonSingleOutputStreamOperator}. + */ + public PythonSingleOutputStreamOperator reduce(ReduceFunction<PyObject> reducer) throws IOException { + return new PythonSingleOutputStreamOperator(this.stream.reduce(new PythonReduceFunction(reducer))); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonObjectInputStream2.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonObjectInputStream2.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonObjectInputStream2.java new file mode 100644 index 0000000..f701d98 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonObjectInputStream2.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.python.api.datastream; + +import org.python.util.PythonObjectInputStream; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectStreamClass; + +/** + * A helper class to overcome the inability to set the serialVersionUID in a python user-defined + * function (UDF). + * <p>The fact the this field is not set, results in a dynamic calculation of this serialVersionUID, + * using SHA, to make sure it is a unique number. This unique number is a 64-bit hash of the + * class name, interface class names, methods, and fields. If a Python class inherits from a + * Java class, as in the case of Python UDFs, then a proxy wrapper class is created. Its name is + * constructed using the following pattern: + * <b>{@code org.python.proxies.<module-name>$<UDF-class-name>$<number>}</b>. The {@code <number>} + * part is increased by one in runtime, for every job submission. It results in different serial + * version UID for each run for the same Python class. Therefore, it is required to silently + * suppress the serial version UID mismatch check.</p> + */ +public class PythonObjectInputStream2 extends PythonObjectInputStream { + + public PythonObjectInputStream2(InputStream in) throws IOException { + super(in); + } + + protected ObjectStreamClass readClassDescriptor() throws IOException, ClassNotFoundException { + ObjectStreamClass resultClassDescriptor = super.readClassDescriptor(); // initially streams descriptor + + Class<?> localClass; + try { + localClass = resolveClass(resultClassDescriptor); + } catch (ClassNotFoundException e) { + System.out.println("No local class for " + resultClassDescriptor.getName()); + return resultClassDescriptor; + } + + ObjectStreamClass localClassDescriptor = ObjectStreamClass.lookup(localClass); + if (localClassDescriptor != null) { // only if class implements serializable + final long localSUID = localClassDescriptor.getSerialVersionUID(); + final long streamSUID = resultClassDescriptor.getSerialVersionUID(); + if (streamSUID != localSUID) { // check for serialVersionUID mismatch. + // Overriding serialized class version mismatch + resultClassDescriptor = localClassDescriptor; // Use local class descriptor for deserialization + } + } + return resultClassDescriptor; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonSingleOutputStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonSingleOutputStreamOperator.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonSingleOutputStreamOperator.java new file mode 100644 index 0000000..add81f7 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonSingleOutputStreamOperator.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api.datastream; + +import org.apache.flink.annotation.Public; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.python.core.PyObject; + + +/** + * A thin wrapper layer over {@link SingleOutputStreamOperator} + * + * <p>{@code PythonSingleOutputStreamOperator} represents a user defined transformation + * applied on a {@link PythonDataStream} with one predefined output type.</p> + */ +@Public +public class PythonSingleOutputStreamOperator extends PythonDataStream<SingleOutputStreamOperator<PyObject>> { + public PythonSingleOutputStreamOperator(SingleOutputStreamOperator<PyObject> stream) { + super(stream); + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonSplitStream.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonSplitStream.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonSplitStream.java new file mode 100644 index 0000000..e7621ca --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonSplitStream.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api.datastream; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.streaming.api.datastream.SplitStream; +import org.python.core.PyObject; + + +/** + * A thin wrapper layer over {@link SplitStream}. + * + * <p>The {@code PythonSplitStream} represents an operator that has been split using an + * {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}. Named outputs + * can be selected using the {@link #select} function. To apply transformation on the whole + * output simply call the transformation on the {@code PythonSplitStream}</p> + */ +@PublicEvolving +public class PythonSplitStream extends PythonDataStream<SplitStream<PyObject>> { + + public PythonSplitStream(SplitStream<PyObject> splitStream) { + super(splitStream); + } + + /** + * A thin wrapper layer over {@link SplitStream#select(java.lang.String...)} + * + * @param output_names + * The output names for which the operator will receive the + * input. + * @return Returns the selected {@link PythonDataStream} + */ + public PythonDataStream select(String... output_names) { + return new PythonDataStream<>(this.stream.select(output_names)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonWindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonWindowedStream.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonWindowedStream.java new file mode 100644 index 0000000..563661e --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonWindowedStream.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api.datastream; + +import org.apache.flink.annotation.Public; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.streaming.api.datastream.WindowedStream; +import org.apache.flink.streaming.api.functions.windowing.WindowFunction; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.python.api.functions.PyKey; +import org.apache.flink.streaming.python.api.functions.PythonApplyFunction; +import org.apache.flink.streaming.python.api.functions.PythonReduceFunction; +import org.python.core.PyObject; + +import java.io.IOException; + +/** + * A thin wrapper layer over {@link WindowedStream}. + * + * <p>A {@code PythonWindowedStream} represents a data stream where elements are grouped by + * key, and for each key, the stream of elements is split into windows based on a + * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission + * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.</p> + */ +@Public +public class PythonWindowedStream<W extends Window> { + private final WindowedStream<PyObject, PyKey, W> stream; + + public PythonWindowedStream(WindowedStream<PyObject, PyKey, W> stream) { + this.stream = stream; + } + + /** + * A thin wrapper layer over {@link WindowedStream#reduce(org.apache.flink.api.common.functions.ReduceFunction)}. + * + * @param fun The reduce function. + * @return The data stream that is the result of applying the reduce function to the window. + */ + @SuppressWarnings("unchecked") + public PythonSingleOutputStreamOperator reduce(ReduceFunction<PyObject> fun) throws IOException { + return new PythonSingleOutputStreamOperator(stream.reduce(new PythonReduceFunction(fun))); + } + + /** + * A thin wrapper layer over {@link WindowedStream#apply(WindowFunction)}. + * + * @param fun The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + public PythonSingleOutputStreamOperator apply( + WindowFunction<PyObject, PyObject, Object, W> fun) throws IOException { + return new PythonSingleOutputStreamOperator(stream.apply(new PythonApplyFunction<>(fun))); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonEnvironmentConfig.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonEnvironmentConfig.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonEnvironmentConfig.java new file mode 100644 index 0000000..2031af1 --- /dev/null +++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonEnvironmentConfig.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.python.api.environment; + + +public class PythonEnvironmentConfig { + public static final String FLINK_PYTHON_DC_ID = "flink"; + + public static final String FLINK_PYTHON_PLAN_NAME = "plan.py"; + + /** + * Holds the path for the local python files cache. Is is set only on the client side by + * the python streaming plan binder. + */ + public static String pythonTmpCachePath; + + /** + * Holds the path in the shared storage at which the python script(s) reside. It is set on the client side + * within the execution process. + */ + public static String FLINK_HDFS_PATH = "hdfs:///tmp/flink"; // "file:/tmp/flink" + +}
