Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/3838#discussion_r117722927
--- Diff: docs/dev/stream/python.md ---
@@ -0,0 +1,649 @@
+---
+title: "Python Programming Guide (Streaming)"
+is_beta: true
+nav-title: Python API
+nav-parent_id: streaming
+nav-pos: 63
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Analysis streaming programs in Flink are regular programs that implement
transformations on
+streaming data sets (e.g., filtering, mapping, joining, grouping). The
streaming data sets are initially
+created from certain sources (e.g., by reading from Apache Kafka, or
reading files, or from collections).
+Results are returned via sinks, which may for example write the data to
(distributed) files, or to
+standard output (for example the command line terminal). Flink streaming
programs run in a variety
+of contexts, standalone, or embedded in other programs. The execution can
happen in a local JVM, or
+on clusters of many machines.
+
+In order to create your own Flink streaming program, we encourage you to
start with the
+[program skeleton](#program-skeleton) and gradually add your own
+[transformations](#transformations). The remaining sections act as
references for additional
+operations and advanced features.
+
+* This will be replaced by the TOC
+{:toc}
+
+Jython Framework
+---------------
+Flink Python streaming API uses Jython framework (see
<http://www.jython.org/archive/21/docs/whatis.html>)
+to drive the execution of a given script. The Python streaming layer, is
actually a thin wrapper layer for the
+existing Java streaming APIs.
+
+#### Constraints
+There are two main constraints for using Jython:
+
+* The latest Python supported version is 2.7
+* It is not straightforward to use Python C extensions
+
+Streaming Program Example
+-------------------------
+The following streaming program is a complete, working example of
WordCount. You can copy & paste the code
+to run it locally (see notes later in this section). It counts the number
of each word (case insensitive)
+in a stream of sentences, on a window size of 50 milliseconds and prints
the results into the standard output.
+
+{% highlight python %}
+from org.apache.flink.streaming.api.functions.source import SourceFunction
+from org.apache.flink.api.common.functions import FlatMapFunction,
ReduceFunction
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.python.api.jython import
PythonStreamExecutionEnvironment
+from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
+
+
+class Generator(SourceFunction):
+ def __init__(self, num_iters):
+ self._running = True
+ self._num_iters = num_iters
+
+ def run(self, ctx):
+ counter = 0
+ while self._running and counter < self._num_iters:
+ ctx.collect('Hello World')
+ counter += 1
+
+ def cancel(self):
+ self._running = False
+
+
+class Tokenizer(FlatMapFunction):
+ def flatMap(self, value, collector):
+ for word in value.lower().split():
+ collector.collect((1, word))
+
+
+class Selector(KeySelector):
+ def getKey(self, input):
+ return input[1]
+
+
+class Sum(ReduceFunction):
+ def reduce(self, input1, input2):
+ count1, word1 = input1
+ count2, word2 = input2
+ return (count1 + count2, word1)
+
+def main():
+ env = PythonStreamExecutionEnvironment.get_execution_environment()
+ env.create_python_source(Generator(num_iters=1000)) \
+ .flat_map(Tokenizer()) \
+ .key_by(Selector()) \
+ .time_window(milliseconds(50)) \
+ .reduce(Sum()) \
+ .print()
+ env.execute()
+
+
+if __name__ == '__main__':
+ main()
+{% endhighlight %}
+
+**Notes:**
+
+- If execution is done on a local cluster, you may replace the last line
in the `main()` function
+ with **`env.execute(True)`**
+- Execution on a multi-node cluster requires a shared medium storage,
which needs to be configured (.e.g HDFS)
+ upfront.
+- The output from of the given script is directed to the standard output.
Consequently, the output
+ is written to the corresponding worker `.out` filed. If the script is
executed inside the IntelliJ IDE,
+ then the output will be displayed in the console tab.
+
+{% top %}
+
+Program Skeleton
+----------------
+As we already saw in the example, Flink streaming programs look like
regular Python programs.
+Each program consists of the same basic parts:
+
+1. A `main()` function definition, without arguments - the program entry
point,
+2. Obtain an `Environment`,
+3. Load/create the initial data,
+4. Specify transformations on this data,
+5. Specify where to put the results of your computations, and
+5. Execute your program.
+
+We will now give an overview of each of those steps but please refer to
the respective sections for
+more details.
+
+The `main()` function is a must and it is used by Flink execution layer to
run the
+given Python streaming program.
+
+The `Environment` is the basis for all Flink programs. You can
+obtain one using these static methods on class
`PythonStreamExecutionEnvironment`:
+
+{% highlight python %}
+get_execution_environment()
+{% endhighlight %}
+
+For specifying data sources the streaming execution environment has
several methods.
+To just read a text file as a sequence of lines, you can use:
+
+{% highlight python %}
+env = get_execution_environment()
+text = env.read_text_file("file:///path/to/file")
+{% endhighlight %}
+
+This will give you a DataStream on which you can then apply
transformations. For
+more information on data sources and input formats, please refer to
+[Data Sources](#data-sources).
+
+Once you have a DataStream you can apply transformations to create a new
+DataStream which you can then write to a file, transform again, or
+combine with other DataStreams. You apply transformations by calling
+methods on DataStream with your own custom transformation function. For
example,
+a map transformation looks like this:
+
+{% highlight python %}
+class Doubler(MapFunction):
+ def map(self, value):
+ return value * 2
+
+data.map(Doubler())
+{% endhighlight %}
+
+This will create a new DataStream by doubling every value in the original
DataStream.
+For more information and a list of all the transformations,
+please refer to [Transformations](#transformations).
+
+Once you have a DataStream that needs to be written to disk you can call
one
+of these methods on DataStream:
+
+{% highlight python %}
+data.write_as_text("<file-path>")
+data.write_as_text("<file-path>", mode=WriteMode.OVERWRITE)
+data.print()
+{% endhighlight %}
+
+The last method is only useful for developing/debugging on a local machine,
+it will output the contents of the DataSet to standard output. (Note that
in
+a cluster, the result goes to the standard out stream of the cluster nodes
and ends
+up in the *.out* files of the workers).
+The first two do as the name suggests.
+Please refer to [Data Sinks](#data-sinks) for more information on writing
to files.
+
+Once you specified the complete program you need to call `execute` on
+the `Environment`. This will either execute on your local machine or
submit your program
+for execution on a cluster, depending on how Flink was started. You can
force
+a local execution by using `execute(True)`.
+
+{% top %}
+
+Project setup
+---------------
+
+Apart from setting up Flink, no additional work is required. Using Jython
to execute the Python
+script, means that no external packages are needed and the program is
executed as if it was a jar file.
+
+The Python API was tested on Linux/OSX systems.
+
+{% top %}
+
+Lazy Evaluation
+---------------
+
+All Flink programs are executed lazily: When the program's main method is
executed, the data loading
+and transformations do not happen directly. Rather, each operation is
created and added to the
+program's plan. The operations are actually executed when one of the
`execute()` methods is invoked
+on the Environment object. Whether the program is executed locally or on a
cluster depends
+on the environment of the program.
+
+The lazy evaluation lets you construct sophisticated programs that Flink
executes as one
+holistically planned unit.
+
+{% top %}
+
+Transformations
+---------------
+
+Data transformations transform one or more DataStreams into a new
DataStream. Programs can combine
+multiple transformations into sophisticated assemblies.
+
+This section gives a brief overview of the available transformations. The
[transformations
+documentation](dataset_transformations.html) has a full description of all
transformations with
+examples.
+
+<br />
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Transformation</th>
+ <th class="text-center">Description</th>
+ </tr>
+ </thead>
+
+ <tbody>
+ <tr>
+ <td><strong>Map</strong><br>PythonDataStream →
PythonDataStream</td>
+ <td>
+ <p>Takes one element and produces one element.</p>
+{% highlight python %}
+class Doubler(MapFunction):
+ def map(self, value):
+ return value * 2
+
+data_stream.map(Doubler())
+{% endhighlight %}
+ </td>
+ </tr>
+
+ <tr>
+ <td><strong>FlatMap</strong><br>PythonDataStream →
PythonDataStream</td>
+ <td>
+ <p>Takes one element and produces zero, one, or more elements. </p>
+{% highlight python %}
+class Tokenizer(FlatMapFunction):
+ def flatMap(self, word, collector):
+ collector.collect((1, word))
+
+data_stream.flat_map(Tokenizer())
+{% endhighlight %}
+ </td>
+ </tr>
+
+ <tr>
+ <td><strong>Filter</strong><br>PythonDataStream →
PythonDataStream</td>
+ <td>
+ <p>Evaluates a boolean function for each element and retains those
for which the function
+ returns true.</p>
+{% highlight python %}
+class GreaterThen1000(FilterFunction):
+ def filter(self, value):
+ return value > 1000
+
+data_stream.filter(GreaterThen1000())
+{% endhighlight %}
+ </td>
+ </tr>
+
+ <tr>
+ <td><strong>KeyBy</strong><br>PythonDataStream →
PythonKeyedStream</td>
+ <td>
+ <p>Logically partitions a stream into disjoint partitions, each
partition containing elements of the same key.
+ Internally, this is implemented with hash partitioning. See <a
href="/dev/api_concepts#specifying-keys">keys</a> on how to specify keys.
+ This transformation returns a PythonKeyedDataStream.</p>
+ {% highlight python %}
+class Selector(KeySelector):
+ def getKey(self, input):
+ return input[1] # Key by the second element in a tuple
+
+data_stream.key_by(Selector()) // Key by field "someKey"
+ {% endhighlight %}
+ </td>
+ </tr>
+
+ <tr>
+ <td><strong>Reduce</strong><br>PythonKeyedStream →
PythonDataStream</td>
+ <td>
+ <p>A "rolling" reduce on a keyed data stream. Combines the current
element with the last reduced value and
+ emits the new value.</p>
+{% highlight python %}
+class Sum(ReduceFunction):
+ def reduce(self, input1, input2):
+ count1, val1 = input1
+ count2, val2 = input2
+ return (count1 + count2, val1)
+
+data.reduce(Sum())
+{% endhighlight %}
+ </td>
+ </tr>
+
+ <tr>
+ <td><strong>Window</strong><br>PythonKeyedStream →
PythonWindowedStream</td>
+ <td>
+ <p>Windows can be defined on already partitioned KeyedStreams.
Windows group the data in each
+ key according to some characteristic (e.g., the data that arrived
within the last 5 seconds).
+ See <a href="windows.html">windows</a> for a complete description
of windows.
+ {% highlight python %}
+keyed_stream.count_window(10, 5) # Last 10 elements, sliding (jumping) by
5 elements
+
+keyed_stream.time_window(milliseconds(30)) # Last 30 milliseconds of data
+
+keted_stream.time_window(milliseconds(100), milliseconds(20)) # Last 100
milliseconds of data, sliding (jumping) by 20 milliseconds
+ {% endhighlight %}
+ </p>
+ </td>
+ </tr>
+
+ <tr>
+ <td><strong>Window Apply</strong><br>PythonWindowedStream →
PythonDataStream</td>
+ <td>
+ <p>Applies a general function to the window as a whole. Below is a
function that manually sums
+ the elements of a window.</p>
+ {% highlight python %}
+class WindowSum(WindowFunction):
+ def apply(self, key, window, values, collector):
+ sum = 0
+ for value in values:
+ sum += value[0]
+ collector.collect((key, sum))
+
+windowed_stream.apply(WindowSum())
+ {% endhighlight %}
+ </td>
+ </tr>
+
+ <tr>
+ <td><strong>Window Reduce</strong><br>PythonWindowedStream →
PythonDataStream</td>
+ <td>
+ <p>Applies a functional reduce function to the window and returns
the reduced value.</p>
+ {% highlight python %}
+class Sum(ReduceFunction):
+ def reduce(self, input1, input2):
+ count1, val1 = input1
+ count2, val2 = input2
+ return (count1 + count2, val1)
+
+windowed_stream.reduce(Sum())
+ {% endhighlight %}
+ </td>
+ </tr>
+
+ <tr>
+ <td><strong>Union</strong><br>PythonDataStream* →
PythonDataStream</td>
+ <td>
+ <p>Union of two or more data streams creating a new stream
containing all the elements from all
+ the streams. Note: If you union a data stream with itself you
will get each element twice
+ in the resulting stream.</p>
+ {% highlight python %}
+data_stream.union(other_stream1, other_stream2, ...);
+ {% endhighlight %}
+ </td>
+ </tr>
+
+ <tr>
+ <td><strong>Split</strong><br>PythonDataStream →
PythonSplitStream</td>
+ <td>
+ <p>Split the stream into two or more streams according to some
criterion.
+ {% highlight python %}
+class StreamSelector(OutputSelector):
+ def select(self, value):
+ return ["even"] if value % 2 == 0 else ["odd"]
+
+splited_stream = data_stream.split(StreamSelector())
+ {% endhighlight %}
+ </p>
+ </td>
+ </tr>
+
+ <tr>
+ <td><strong>Select</strong><br>SplitStream → DataStream</td>
+ <td>
+ <p> Select one or more streams from a split stream.
+ {% highlight python %}
+even_data_stream = splited_stream.select("even")
+odd_data_stream = splited_stream.select("odd")
+all_data_stream = splited_stream.select("even", "odd")
+ {% endhighlight %}
+ </p>
+ </td>
+ </tr>
+
+ <tr>
+ <td><strong>Iterate</strong><br>PythonDataStream →
PythonIterativeStream → PythonDataStream</td>
+ <td>
+ <p> Creates a "feedback" loop in the flow, by redirecting the
output of one operator
+ to some previous operator. This is especially useful for
defining algorithms that
+ continuously update a model. The following code starts with a
stream and applies
+ the iteration body continuously. Elements that are greater
than 0 are sent back
+ to the feedback channel, and the rest of the elements are
forwarded downstream.
+ See <a href="#iterations">iterations</a> for a complete
description.
+ {% highlight java %}
+class MinusOne(MapFunction):
+ def map(self, value):
+ return value - 1
+
+class PositiveNumber(FilterFunction):
+ def filter(self, value):
+ return value > 0
+
+class LessEquelToZero(FilterFunction):
+ def filter(self, value):
+ return value <= 0
+
+iteration = initial_stream.iterate(5000)
+iteration_body = iteration.map(MinusOne())
+feedback = iteration_body.filter(PositiveNumber())
+iteration.close_with(feedback)
+output = iteration_body.filter(LessEquelToZero())
+ {% endhighlight %}
+ </p>
+ </td>
+ </tr>
+
+ </tbody>
+</table>
+
+{% top %}
+
+Passing Functions to Flink
+--------------------------
+
+Certain operations require user-defined functions as arguments. All the
functions should be
+defined as Python classes that derived from the relevant Flink function.
User-defined functions
+are serialized and sent over to the TaskManagers for execution.
+
+{% highlight python %}
+class Filter(FilterFunction):
+ def filter(self, value):
+ return value > 5
+
+data_stream.filter(Filter())
+{% endhighlight %}
+
+Rich functions (.e.g `RichFileterFunction`) enable to define (override)
the optional operations: `open` & `close`.
--- End diff --
type: Fileter -> Filter
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---