[
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16019490#comment-16019490
]
ASF GitHub Bot commented on FLINK-5886:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/3838#discussion_r117722927
--- Diff: docs/dev/stream/python.md ---
@@ -0,0 +1,649 @@
+---
+title: "Python Programming Guide (Streaming)"
+is_beta: true
+nav-title: Python API
+nav-parent_id: streaming
+nav-pos: 63
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Analysis streaming programs in Flink are regular programs that implement
transformations on
+streaming data sets (e.g., filtering, mapping, joining, grouping). The
streaming data sets are initially
+created from certain sources (e.g., by reading from Apache Kafka, or
reading files, or from collections).
+Results are returned via sinks, which may for example write the data to
(distributed) files, or to
+standard output (for example the command line terminal). Flink streaming
programs run in a variety
+of contexts, standalone, or embedded in other programs. The execution can
happen in a local JVM, or
+on clusters of many machines.
+
+In order to create your own Flink streaming program, we encourage you to
start with the
+[program skeleton](#program-skeleton) and gradually add your own
+[transformations](#transformations). The remaining sections act as
references for additional
+operations and advanced features.
+
+* This will be replaced by the TOC
+{:toc}
+
+Jython Framework
+---------------
+Flink Python streaming API uses Jython framework (see
<http://www.jython.org/archive/21/docs/whatis.html>)
+to drive the execution of a given script. The Python streaming layer, is
actually a thin wrapper layer for the
+existing Java streaming APIs.
+
+#### Constraints
+There are two main constraints for using Jython:
+
+* The latest Python supported version is 2.7
+* It is not straightforward to use Python C extensions
+
+Streaming Program Example
+-------------------------
+The following streaming program is a complete, working example of
WordCount. You can copy & paste the code
+to run it locally (see notes later in this section). It counts the number
of each word (case insensitive)
+in a stream of sentences, on a window size of 50 milliseconds and prints
the results into the standard output.
+
+{% highlight python %}
+from org.apache.flink.streaming.api.functions.source import SourceFunction
+from org.apache.flink.api.common.functions import FlatMapFunction,
ReduceFunction
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.python.api.jython import
PythonStreamExecutionEnvironment
+from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
+
+
+class Generator(SourceFunction):
+ def __init__(self, num_iters):
+ self._running = True
+ self._num_iters = num_iters
+
+ def run(self, ctx):
+ counter = 0
+ while self._running and counter < self._num_iters:
+ ctx.collect('Hello World')
+ counter += 1
+
+ def cancel(self):
+ self._running = False
+
+
+class Tokenizer(FlatMapFunction):
+ def flatMap(self, value, collector):
+ for word in value.lower().split():
+ collector.collect((1, word))
+
+
+class Selector(KeySelector):
+ def getKey(self, input):
+ return input[1]
+
+
+class Sum(ReduceFunction):
+ def reduce(self, input1, input2):
+ count1, word1 = input1
+ count2, word2 = input2
+ return (count1 + count2, word1)
+
+def main():
+ env = PythonStreamExecutionEnvironment.get_execution_environment()
+ env.create_python_source(Generator(num_iters=1000)) \
+ .flat_map(Tokenizer()) \
+ .key_by(Selector()) \
+ .time_window(milliseconds(50)) \
+ .reduce(Sum()) \
+ .print()
+ env.execute()
+
+
+if __name__ == '__main__':
+ main()
+{% endhighlight %}
+
+**Notes:**
+
+- If execution is done on a local cluster, you may replace the last line
in the `main()` function
+ with **`env.execute(True)`**
+- Execution on a multi-node cluster requires a shared medium storage,
which needs to be configured (.e.g HDFS)
+ upfront.
+- The output from of the given script is directed to the standard output.
Consequently, the output
+ is written to the corresponding worker `.out` filed. If the script is
executed inside the IntelliJ IDE,
+ then the output will be displayed in the console tab.
+
+{% top %}
+
+Program Skeleton
+----------------
+As we already saw in the example, Flink streaming programs look like
regular Python programs.
+Each program consists of the same basic parts:
+
+1. A `main()` function definition, without arguments - the program entry
point,
+2. Obtain an `Environment`,
+3. Load/create the initial data,
+4. Specify transformations on this data,
+5. Specify where to put the results of your computations, and
+5. Execute your program.
+
+We will now give an overview of each of those steps but please refer to
the respective sections for
+more details.
+
+The `main()` function is a must and it is used by Flink execution layer to
run the
+given Python streaming program.
+
+The `Environment` is the basis for all Flink programs. You can
+obtain one using these static methods on class
`PythonStreamExecutionEnvironment`:
+
+{% highlight python %}
+get_execution_environment()
+{% endhighlight %}
+
+For specifying data sources the streaming execution environment has
several methods.
+To just read a text file as a sequence of lines, you can use:
+
+{% highlight python %}
+env = get_execution_environment()
+text = env.read_text_file("file:///path/to/file")
+{% endhighlight %}
+
+This will give you a DataStream on which you can then apply
transformations. For
+more information on data sources and input formats, please refer to
+[Data Sources](#data-sources).
+
+Once you have a DataStream you can apply transformations to create a new
+DataStream which you can then write to a file, transform again, or
+combine with other DataStreams. You apply transformations by calling
+methods on DataStream with your own custom transformation function. For
example,
+a map transformation looks like this:
+
+{% highlight python %}
+class Doubler(MapFunction):
+ def map(self, value):
+ return value * 2
+
+data.map(Doubler())
+{% endhighlight %}
+
+This will create a new DataStream by doubling every value in the original
DataStream.
+For more information and a list of all the transformations,
+please refer to [Transformations](#transformations).
+
+Once you have a DataStream that needs to be written to disk you can call
one
+of these methods on DataStream:
+
+{% highlight python %}
+data.write_as_text("<file-path>")
+data.write_as_text("<file-path>", mode=WriteMode.OVERWRITE)
+data.print()
+{% endhighlight %}
+
+The last method is only useful for developing/debugging on a local machine,
+it will output the contents of the DataSet to standard output. (Note that
in
+a cluster, the result goes to the standard out stream of the cluster nodes
and ends
+up in the *.out* files of the workers).
+The first two do as the name suggests.
+Please refer to [Data Sinks](#data-sinks) for more information on writing
to files.
+
+Once you specified the complete program you need to call `execute` on
+the `Environment`. This will either execute on your local machine or
submit your program
+for execution on a cluster, depending on how Flink was started. You can
force
+a local execution by using `execute(True)`.
+
+{% top %}
+
+Project setup
+---------------
+
+Apart from setting up Flink, no additional work is required. Using Jython
to execute the Python
+script, means that no external packages are needed and the program is
executed as if it was a jar file.
+
+The Python API was tested on Linux/OSX systems.
+
+{% top %}
+
+Lazy Evaluation
+---------------
+
+All Flink programs are executed lazily: When the program's main method is
executed, the data loading
+and transformations do not happen directly. Rather, each operation is
created and added to the
+program's plan. The operations are actually executed when one of the
`execute()` methods is invoked
+on the Environment object. Whether the program is executed locally or on a
cluster depends
+on the environment of the program.
+
+The lazy evaluation lets you construct sophisticated programs that Flink
executes as one
+holistically planned unit.
+
+{% top %}
+
+Transformations
+---------------
+
+Data transformations transform one or more DataStreams into a new
DataStream. Programs can combine
+multiple transformations into sophisticated assemblies.
+
+This section gives a brief overview of the available transformations. The
[transformations
+documentation](dataset_transformations.html) has a full description of all
transformations with
+examples.
+
+<br />
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Transformation</th>
+ <th class="text-center">Description</th>
+ </tr>
+ </thead>
+
+ <tbody>
+ <tr>
+ <td><strong>Map</strong><br>PythonDataStream →
PythonDataStream</td>
+ <td>
+ <p>Takes one element and produces one element.</p>
+{% highlight python %}
+class Doubler(MapFunction):
+ def map(self, value):
+ return value * 2
+
+data_stream.map(Doubler())
+{% endhighlight %}
+ </td>
+ </tr>
+
+ <tr>
+ <td><strong>FlatMap</strong><br>PythonDataStream →
PythonDataStream</td>
+ <td>
+ <p>Takes one element and produces zero, one, or more elements. </p>
+{% highlight python %}
+class Tokenizer(FlatMapFunction):
+ def flatMap(self, word, collector):
+ collector.collect((1, word))
+
+data_stream.flat_map(Tokenizer())
+{% endhighlight %}
+ </td>
+ </tr>
+
+ <tr>
+ <td><strong>Filter</strong><br>PythonDataStream →
PythonDataStream</td>
+ <td>
+ <p>Evaluates a boolean function for each element and retains those
for which the function
+ returns true.</p>
+{% highlight python %}
+class GreaterThen1000(FilterFunction):
+ def filter(self, value):
+ return value > 1000
+
+data_stream.filter(GreaterThen1000())
+{% endhighlight %}
+ </td>
+ </tr>
+
+ <tr>
+ <td><strong>KeyBy</strong><br>PythonDataStream →
PythonKeyedStream</td>
+ <td>
+ <p>Logically partitions a stream into disjoint partitions, each
partition containing elements of the same key.
+ Internally, this is implemented with hash partitioning. See <a
href="/dev/api_concepts#specifying-keys">keys</a> on how to specify keys.
+ This transformation returns a PythonKeyedDataStream.</p>
+ {% highlight python %}
+class Selector(KeySelector):
+ def getKey(self, input):
+ return input[1] # Key by the second element in a tuple
+
+data_stream.key_by(Selector()) // Key by field "someKey"
+ {% endhighlight %}
+ </td>
+ </tr>
+
+ <tr>
+ <td><strong>Reduce</strong><br>PythonKeyedStream →
PythonDataStream</td>
+ <td>
+ <p>A "rolling" reduce on a keyed data stream. Combines the current
element with the last reduced value and
+ emits the new value.</p>
+{% highlight python %}
+class Sum(ReduceFunction):
+ def reduce(self, input1, input2):
+ count1, val1 = input1
+ count2, val2 = input2
+ return (count1 + count2, val1)
+
+data.reduce(Sum())
+{% endhighlight %}
+ </td>
+ </tr>
+
+ <tr>
+ <td><strong>Window</strong><br>PythonKeyedStream →
PythonWindowedStream</td>
+ <td>
+ <p>Windows can be defined on already partitioned KeyedStreams.
Windows group the data in each
+ key according to some characteristic (e.g., the data that arrived
within the last 5 seconds).
+ See <a href="windows.html">windows</a> for a complete description
of windows.
+ {% highlight python %}
+keyed_stream.count_window(10, 5) # Last 10 elements, sliding (jumping) by
5 elements
+
+keyed_stream.time_window(milliseconds(30)) # Last 30 milliseconds of data
+
+keted_stream.time_window(milliseconds(100), milliseconds(20)) # Last 100
milliseconds of data, sliding (jumping) by 20 milliseconds
+ {% endhighlight %}
+ </p>
+ </td>
+ </tr>
+
+ <tr>
+ <td><strong>Window Apply</strong><br>PythonWindowedStream →
PythonDataStream</td>
+ <td>
+ <p>Applies a general function to the window as a whole. Below is a
function that manually sums
+ the elements of a window.</p>
+ {% highlight python %}
+class WindowSum(WindowFunction):
+ def apply(self, key, window, values, collector):
+ sum = 0
+ for value in values:
+ sum += value[0]
+ collector.collect((key, sum))
+
+windowed_stream.apply(WindowSum())
+ {% endhighlight %}
+ </td>
+ </tr>
+
+ <tr>
+ <td><strong>Window Reduce</strong><br>PythonWindowedStream →
PythonDataStream</td>
+ <td>
+ <p>Applies a functional reduce function to the window and returns
the reduced value.</p>
+ {% highlight python %}
+class Sum(ReduceFunction):
+ def reduce(self, input1, input2):
+ count1, val1 = input1
+ count2, val2 = input2
+ return (count1 + count2, val1)
+
+windowed_stream.reduce(Sum())
+ {% endhighlight %}
+ </td>
+ </tr>
+
+ <tr>
+ <td><strong>Union</strong><br>PythonDataStream* →
PythonDataStream</td>
+ <td>
+ <p>Union of two or more data streams creating a new stream
containing all the elements from all
+ the streams. Note: If you union a data stream with itself you
will get each element twice
+ in the resulting stream.</p>
+ {% highlight python %}
+data_stream.union(other_stream1, other_stream2, ...);
+ {% endhighlight %}
+ </td>
+ </tr>
+
+ <tr>
+ <td><strong>Split</strong><br>PythonDataStream →
PythonSplitStream</td>
+ <td>
+ <p>Split the stream into two or more streams according to some
criterion.
+ {% highlight python %}
+class StreamSelector(OutputSelector):
+ def select(self, value):
+ return ["even"] if value % 2 == 0 else ["odd"]
+
+splited_stream = data_stream.split(StreamSelector())
+ {% endhighlight %}
+ </p>
+ </td>
+ </tr>
+
+ <tr>
+ <td><strong>Select</strong><br>SplitStream → DataStream</td>
+ <td>
+ <p> Select one or more streams from a split stream.
+ {% highlight python %}
+even_data_stream = splited_stream.select("even")
+odd_data_stream = splited_stream.select("odd")
+all_data_stream = splited_stream.select("even", "odd")
+ {% endhighlight %}
+ </p>
+ </td>
+ </tr>
+
+ <tr>
+ <td><strong>Iterate</strong><br>PythonDataStream →
PythonIterativeStream → PythonDataStream</td>
+ <td>
+ <p> Creates a "feedback" loop in the flow, by redirecting the
output of one operator
+ to some previous operator. This is especially useful for
defining algorithms that
+ continuously update a model. The following code starts with a
stream and applies
+ the iteration body continuously. Elements that are greater
than 0 are sent back
+ to the feedback channel, and the rest of the elements are
forwarded downstream.
+ See <a href="#iterations">iterations</a> for a complete
description.
+ {% highlight java %}
+class MinusOne(MapFunction):
+ def map(self, value):
+ return value - 1
+
+class PositiveNumber(FilterFunction):
+ def filter(self, value):
+ return value > 0
+
+class LessEquelToZero(FilterFunction):
+ def filter(self, value):
+ return value <= 0
+
+iteration = initial_stream.iterate(5000)
+iteration_body = iteration.map(MinusOne())
+feedback = iteration_body.filter(PositiveNumber())
+iteration.close_with(feedback)
+output = iteration_body.filter(LessEquelToZero())
+ {% endhighlight %}
+ </p>
+ </td>
+ </tr>
+
+ </tbody>
+</table>
+
+{% top %}
+
+Passing Functions to Flink
+--------------------------
+
+Certain operations require user-defined functions as arguments. All the
functions should be
+defined as Python classes that derived from the relevant Flink function.
User-defined functions
+are serialized and sent over to the TaskManagers for execution.
+
+{% highlight python %}
+class Filter(FilterFunction):
+ def filter(self, value):
+ return value > 5
+
+data_stream.filter(Filter())
+{% endhighlight %}
+
+Rich functions (.e.g `RichFileterFunction`) enable to define (override)
the optional operations: `open` & `close`.
--- End diff --
type: Fileter -> Filter
> Python API for streaming applications
> -------------------------------------
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
> Issue Type: New Feature
> Components: Python API
> Reporter: Zohar Mizrahi
> Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The
> core technology is based on jython and thus imposes two limitations: a. user
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was
> setup properly (see:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
> one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}},
> which in return will execute all the tests under
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)