[FLINK-5886][py] Add Python Streaming API

This closes #3838.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/00284fb8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/00284fb8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/00284fb8

Branch: refs/heads/master
Commit: 00284fb8131ecb064a08a2da010d27ae95806744
Parents: 1809e9f
Author: Zohar Mizrahi <[email protected]>
Authored: Tue Nov 15 13:46:36 2016 +0100
Committer: zentol <[email protected]>
Committed: Sat Feb 17 15:59:56 2018 +0100

----------------------------------------------------------------------
 docs/.gitignore                                 |   6 +
 docs/dev/stream/python.md                       | 649 +++++++++++++++++++
 flink-dist/pom.xml                              |   7 +
 flink-dist/src/main/assemblies/bin.xml          |  18 +-
 .../src/main/flink-bin/bin/pyflink-stream.sh    |  25 +
 .../src/main/python/fibonacci.py                | 134 ++++
 flink-libraries/flink-streaming-python/pom.xml  | 104 +++
 .../python/api/PythonStreamBinder.java          | 196 ++++++
 .../python/api/datastream/PythonDataStream.java | 269 ++++++++
 .../api/datastream/PythonIterativeStream.java   |  57 ++
 .../api/datastream/PythonKeyedStream.java       |  88 +++
 .../datastream/PythonObjectInputStream2.java    |  68 ++
 .../PythonSingleOutputStreamOperator.java       |  37 ++
 .../api/datastream/PythonSplitStream.java       |  51 ++
 .../api/datastream/PythonWindowedStream.java    |  69 ++
 .../environment/PythonEnvironmentConfig.java    |  39 ++
 .../PythonStreamExecutionEnvironment.java       | 421 ++++++++++++
 .../streaming/python/api/functions/PyKey.java   |  54 ++
 .../api/functions/PythonApplyFunction.java      |  77 +++
 .../api/functions/PythonFilterFunction.java     |  72 ++
 .../api/functions/PythonFlatMapFunction.java    |  78 +++
 .../api/functions/PythonGeneratorFunction.java  |  65 ++
 .../api/functions/PythonIteratorFunction.java   |  69 ++
 .../python/api/functions/PythonKeySelector.java |  53 ++
 .../python/api/functions/PythonMapFunction.java |  70 ++
 .../api/functions/PythonOutputSelector.java     |  62 ++
 .../api/functions/PythonReduceFunction.java     |  53 ++
 .../api/functions/PythonSinkFunction.java       |  72 ++
 .../python/api/functions/UtilityFunctions.java  | 143 ++++
 .../connectors/PythonFlinkKafkaConsumer09.java  |  38 ++
 .../connectors/PythonFlinkKafkaProducer09.java  |  44 ++
 .../streaming/python/util/PythonCollector.java  |  47 ++
 .../streaming/python/util/PythonIterator.java   |  28 +
 .../util/serialization/PyObjectSerializer.java  |  59 ++
 .../PythonDeserializationSchema.java            |  62 ++
 .../PythonSerializationSchema.java              |  53 ++
 .../util/serialization/SerializationUtils.java  |  46 ++
 .../python/api/PythonStreamBinderTest.java      | 100 +++
 .../flink/streaming/python/api/run_all_tests.py |  75 +++
 .../flink/streaming/python/api/test_filter.py   |  82 +++
 .../streaming/python/api/test_flatmap_int.py    |  77 +++
 .../python/api/test_flatmap_list_int.py         |  76 +++
 .../streaming/python/api/test_flatmap_string.py |  76 +++
 .../python/api/test_from_collection.py          |  67 ++
 .../streaming/python/api/test_from_elements.py  |  67 ++
 .../streaming/python/api/test_from_iterator.py  |  84 +++
 .../python/api/test_generate_sequence.py        |  65 ++
 .../streaming/python/api/test_iterations.py     |  69 ++
 .../flink/streaming/python/api/test_kafka09.py  | 157 +++++
 .../python/api/test_keyed_stream_reduce.py      |  69 ++
 .../flink/streaming/python/api/test_map.py      |  83 +++
 .../flink/streaming/python/api/test_map_int.py  |  77 +++
 .../streaming/python/api/test_read_text_file.py |  82 +++
 .../python/api/test_socket_text_stream.py       |  95 +++
 .../streaming/python/api/test_split_select.py   |  80 +++
 .../flink/streaming/python/api/test_union.py    |  77 +++
 .../streaming/python/api/test_user_type.py      |  86 +++
 .../streaming/python/api/test_window_apply.py   |  72 ++
 .../streaming/python/api/test_word_count.py     |  84 +++
 .../streaming/python/api/test_write_as_text.py  |  68 ++
 .../python/api/test_write_to_socket.py          | 106 +++
 .../streaming/python/api/utils/__init__.py      |  17 +
 .../streaming/python/api/utils/constants.py     |  21 +
 .../python/api/utils/pygeneratorbase.py         |  36 +
 .../python/api/utils/python_test_base.py        |  35 +
 .../flink/streaming/python/api/utils/utils.py   |  47 ++
 flink-libraries/pom.xml                         |   1 +
 67 files changed, 5613 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/docs/.gitignore
----------------------------------------------------------------------
diff --git a/docs/.gitignore b/docs/.gitignore
new file mode 100644
index 0000000..98b6f6b
--- /dev/null
+++ b/docs/.gitignore
@@ -0,0 +1,6 @@
+.bundle/
+.jekyll-metadata
+.rubydeps/
+content/
+ruby2/.bundle/
+ruby2/.rubydeps/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/docs/dev/stream/python.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/python.md b/docs/dev/stream/python.md
new file mode 100644
index 0000000..02edbfa
--- /dev/null
+++ b/docs/dev/stream/python.md
@@ -0,0 +1,649 @@
+---
+title: "Python Programming Guide (Streaming)"
+is_beta: true
+nav-title: Python API
+nav-parent_id: streaming
+nav-pos: 63
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Analysis streaming programs in Flink are regular programs that implement 
transformations on
+streaming data sets (e.g., filtering, mapping, joining, grouping). The 
streaming data sets are initially
+created from certain sources (e.g., by reading from Apache Kafka, or reading 
files, or from collections).
+Results are returned via sinks, which may for example write the data to 
(distributed) files, or to
+standard output (for example the command line terminal). Flink streaming 
programs run in a variety
+of contexts, standalone, or embedded in other programs. The execution can 
happen in a local JVM, or
+on clusters of many machines.
+
+In order to create your own Flink streaming program, we encourage you to start 
with the
+[program skeleton](#program-skeleton) and gradually add your own
+[transformations](#transformations). The remaining sections act as references 
for additional
+operations and advanced features.
+
+* This will be replaced by the TOC
+{:toc}
+
+Jython Framework
+---------------
+Flink Python streaming API uses Jython framework (see 
<http://www.jython.org/archive/21/docs/whatis.html>)
+to drive the execution of a given script. The Python streaming layer, is 
actually a thin wrapper layer for the
+existing Java streaming APIs.
+
+#### Constraints
+There are two main constraints for using Jython:
+
+* The latest Python supported version is 2.7
+* It is not straightforward to use Python C extensions
+
+Streaming Program Example
+-------------------------
+The following streaming program is a complete, working example of WordCount. 
You can copy &amp; paste the code
+to run it locally (see notes later in this section). It counts the number of 
each word (case insensitive)
+in a stream of sentences, on a window size of 50 milliseconds and prints the 
results into the standard output.
+
+{% highlight python %}
+from org.apache.flink.streaming.api.functions.source import SourceFunction
+from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.python.api.jython import PythonStreamExecutionEnvironment
+from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
+
+
+class Generator(SourceFunction):
+    def __init__(self, num_iters):
+        self._running = True
+        self._num_iters = num_iters
+
+    def run(self, ctx):
+        counter = 0
+        while self._running and counter < self._num_iters:
+            ctx.collect('Hello World')
+            counter += 1
+
+    def cancel(self):
+        self._running = False
+
+
+class Tokenizer(FlatMapFunction):
+    def flatMap(self, value, collector):
+        for word in value.lower().split():
+            collector.collect((1, word))
+
+
+class Selector(KeySelector):
+    def getKey(self, input):
+        return input[1]
+
+
+class Sum(ReduceFunction):
+    def reduce(self, input1, input2):
+        count1, word1 = input1
+        count2, word2 = input2
+        return (count1 + count2, word1)
+
+def main():
+    env = PythonStreamExecutionEnvironment.get_execution_environment()
+    env.create_python_source(Generator(num_iters=1000)) \
+        .flat_map(Tokenizer()) \
+        .key_by(Selector()) \
+        .time_window(milliseconds(50)) \
+        .reduce(Sum()) \
+        .print()
+    env.execute()
+
+
+if __name__ == '__main__':
+    main()
+{% endhighlight %}
+
+**Notes:**
+
+- If execution is done on a local cluster, you may replace the last line in 
the `main()` function
+  with **`env.execute(True)`**
+- Execution on a multi-node cluster requires a shared medium storage, which 
needs to be configured (.e.g HDFS)
+  upfront.
+- The output from of the given script is directed to the standard output. 
Consequently, the output
+  is written to the corresponding worker `.out` file. If the script is 
executed inside the IntelliJ IDE,
+  then the output will be displayed in the console tab.
+
+{% top %}
+
+Program Skeleton
+----------------
+As we already saw in the example, Flink streaming programs look like regular 
Python programs.
+Each program consists of the same basic parts:
+
+1. A `main()` function definition, without arguments - the program entry point,
+2. Obtain an `Environment`,
+3. Load/create the initial data,
+4. Specify transformations on this data,
+5. Specify where to put the results of your computations, and
+5. Execute your program.
+
+We will now give an overview of each of those steps but please refer to the 
respective sections for
+more details.
+
+The `main()` function is a must and it is used by Flink execution layer to run 
the
+given Python streaming program.
+
+The `Environment` is the basis for all Flink programs. You can
+obtain one using these static methods on class 
`PythonStreamExecutionEnvironment`:
+
+{% highlight python %}
+get_execution_environment()
+{% endhighlight %}
+
+For specifying data sources the streaming execution environment has several 
methods.
+To just read a text file as a sequence of lines, you can use:
+
+{% highlight python %}
+env = get_execution_environment()
+text = env.read_text_file("file:///path/to/file")
+{% endhighlight %}
+
+This will give you a DataStream on which you can then apply transformations. 
For
+more information on data sources and input formats, please refer to
+[Data Sources](#data-sources).
+
+Once you have a DataStream you can apply transformations to create a new
+DataStream which you can then write to a file, transform again, or
+combine with other DataStreams. You apply transformations by calling
+methods on DataStream with your own custom transformation function. For 
example,
+a map transformation looks like this:
+
+{% highlight python %}
+class Doubler(MapFunction):
+    def map(self, value):
+        return value * 2
+
+data.map(Doubler())
+{% endhighlight %}
+
+This will create a new DataStream by doubling every value in the original 
DataStream.
+For more information and a list of all the transformations,
+please refer to [Transformations](#transformations).
+
+Once you have a DataStream that needs to be written to disk you can call one
+of these methods on DataStream:
+
+{% highlight python %}
+data.write_as_text("<file-path>")
+data.write_as_text("<file-path>", mode=WriteMode.OVERWRITE)
+data.print()
+{% endhighlight %}
+
+The last method is only useful for developing/debugging on a local machine,
+it will output the contents of the DataSet to standard output. (Note that in
+a cluster, the result goes to the standard out stream of the cluster nodes and 
ends
+up in the *.out* files of the workers).
+The first two do as the name suggests.
+Please refer to [Data Sinks](#data-sinks) for more information on writing to 
files.
+
+Once you specified the complete program you need to call `execute` on
+the `Environment`. This will either execute on your local machine or submit 
your program
+for execution on a cluster, depending on how Flink was started. You can force
+a local execution by using `execute(True)`.
+
+{% top %}
+
+Project setup
+---------------
+
+Apart from setting up Flink, no additional work is required. Using Jython to 
execute the Python
+script, means that no external packages are needed and the program is executed 
as if it was a jar file.
+
+The Python API was tested on Linux/OSX systems.
+
+{% top %}
+
+Lazy Evaluation
+---------------
+
+All Flink programs are executed lazily: When the program's main method is 
executed, the data loading
+and transformations do not happen directly. Rather, each operation is created 
and added to the
+program's plan. The operations are actually executed when one of the 
`execute()` methods is invoked
+on the Environment object. Whether the program is executed locally or on a 
cluster depends
+on the environment of the program.
+
+The lazy evaluation lets you construct sophisticated programs that Flink 
executes as one
+holistically planned unit.
+
+{% top %}
+
+Transformations
+---------------
+
+Data transformations transform one or more DataStreams into a new DataStream. 
Programs can combine
+multiple transformations into sophisticated assemblies.
+
+This section gives a brief overview of the available transformations. The 
[transformations
+documentation](dataset_transformations.html) has a full description of all 
transformations with
+examples.
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+
+  <tbody>
+    <tr>
+      <td><strong>Map</strong><br>PythonDataStream &rarr; PythonDataStream</td>
+      <td>
+        <p>Takes one element and produces one element.</p>
+{% highlight python %}
+class Doubler(MapFunction):
+    def map(self, value):
+        return value * 2
+
+data_stream.map(Doubler())
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>FlatMap</strong><br>PythonDataStream &rarr; 
PythonDataStream</td>
+      <td>
+        <p>Takes one element and produces zero, one, or more elements. </p>
+{% highlight python %}
+class Tokenizer(FlatMapFunction):
+    def flatMap(self, word, collector):
+        collector.collect((1, word))
+
+data_stream.flat_map(Tokenizer())
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Filter</strong><br>PythonDataStream &rarr; 
PythonDataStream</td>
+      <td>
+        <p>Evaluates a boolean function for each element and retains those for 
which the function
+        returns true.</p>
+{% highlight python %}
+class GreaterThen1000(FilterFunction):
+    def filter(self, value):
+        return value > 1000
+
+data_stream.filter(GreaterThen1000())
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>KeyBy</strong><br>PythonDataStream &rarr; 
PythonKeyedStream</td>
+      <td>
+        <p>Logically partitions a stream into disjoint partitions, each 
partition containing elements of the same key.
+        Internally, this is implemented with hash partitioning. See <a 
href="/dev/api_concepts#specifying-keys">keys</a> on how to specify keys.
+        This transformation returns a PythonKeyedDataStream.</p>
+    {% highlight python %}
+class Selector(KeySelector):
+    def getKey(self, input):
+        return input[1]  # Key by the second element in a tuple
+
+data_stream.key_by(Selector()) // Key by field "someKey"
+    {% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Reduce</strong><br>PythonKeyedStream &rarr; 
PythonDataStream</td>
+      <td>
+        <p>A "rolling" reduce on a keyed data stream. Combines the current 
element with the last reduced value and
+                        emits the new value.</p>
+{% highlight python %}
+class Sum(ReduceFunction):
+    def reduce(self, input1, input2):
+        count1, val1 = input1
+        count2, val2 = input2
+        return (count1 + count2, val1)
+
+data.reduce(Sum())
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Window</strong><br>PythonKeyedStream &rarr; 
PythonWindowedStream</td>
+      <td>
+        <p>Windows can be defined on already partitioned KeyedStreams. Windows 
group the data in each
+        key according to some characteristic (e.g., the data that arrived 
within the last 5 seconds).
+        See <a href="windows.html">windows</a> for a complete description of 
windows.
+    {% highlight python %}
+keyed_stream.count_window(10, 5)  # Last 10 elements, sliding (jumping) by 5 
elements
+
+keyed_stream.time_window(milliseconds(30))  # Last 30 milliseconds of data
+
+keted_stream.time_window(milliseconds(100), milliseconds(20))  # Last 100 
milliseconds of data, sliding (jumping) by 20 milliseconds
+    {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Window Apply</strong><br>PythonWindowedStream &rarr; 
PythonDataStream</td>
+      <td>
+        <p>Applies a general function to the window as a whole. Below is a 
function that manually sums
+           the elements of a window.</p>
+    {% highlight python %}
+class WindowSum(WindowFunction):
+    def apply(self, key, window, values, collector):
+        sum = 0
+        for value in values:
+            sum += value[0]
+        collector.collect((key, sum))
+
+windowed_stream.apply(WindowSum())
+    {% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Window Reduce</strong><br>PythonWindowedStream &rarr; 
PythonDataStream</td>
+      <td>
+        <p>Applies a functional reduce function to the window and returns the 
reduced value.</p>
+    {% highlight python %}
+class Sum(ReduceFunction):
+    def reduce(self, input1, input2):
+        count1, val1 = input1
+        count2, val2 = input2
+        return (count1 + count2, val1)
+
+windowed_stream.reduce(Sum())
+    {% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+    <td><strong>Union</strong><br>PythonDataStream* &rarr; 
PythonDataStream</td>
+      <td>
+        <p>Union of two or more data streams creating a new stream containing 
all the elements from all
+           the streams. Note: If you union a data stream with itself you will 
get each element twice
+           in the resulting stream.</p>
+    {% highlight python %}
+data_stream.union(other_stream1, other_stream2, ...);
+    {% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Split</strong><br>PythonDataStream &rarr; 
PythonSplitStream</td>
+      <td>
+        <p>Split the stream into two or more streams according to some 
criterion.
+    {% highlight python %}
+class StreamSelector(OutputSelector):
+    def select(self, value):
+        return ["even"] if value % 2 == 0 else ["odd"]
+
+splited_stream = data_stream.split(StreamSelector())
+    {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Select</strong><br>SplitStream &rarr; DataStream</td>
+      <td>
+        <p> Select one or more streams from a split stream.
+    {% highlight python %}
+even_data_stream = splited_stream.select("even")
+odd_data_stream = splited_stream.select("odd")
+all_data_stream = splited_stream.select("even", "odd")
+    {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Iterate</strong><br>PythonDataStream &rarr; 
PythonIterativeStream &rarr; PythonDataStream</td>
+      <td>
+        <p> Creates a "feedback" loop in the flow, by redirecting the output 
of one operator
+            to some previous operator. This is especially useful for defining 
algorithms that
+            continuously update a model. The following code starts with a 
stream and applies
+            the iteration body continuously. Elements that are greater than 0 
are sent back
+            to the feedback channel, and the rest of the elements are 
forwarded downstream.
+            See <a href="#iterations">iterations</a> for a complete 
description.
+        {% highlight java %}
+class MinusOne(MapFunction):
+    def map(self, value):
+        return value - 1
+
+class PositiveNumber(FilterFunction):
+    def filter(self, value):
+        return value > 0
+
+class LessEquelToZero(FilterFunction):
+    def filter(self, value):
+        return value <= 0
+
+iteration = initial_stream.iterate(5000)
+iteration_body = iteration.map(MinusOne())
+feedback = iteration_body.filter(PositiveNumber())
+iteration.close_with(feedback)
+output = iteration_body.filter(LessEquelToZero())
+        {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+
+  </tbody>
+</table>
+
+{% top %}
+
+Passing Functions to Flink
+--------------------------
+
+Certain operations require user-defined functions as arguments. All the 
functions should be
+defined as Python classes that derived from the relevant Flink function. 
User-defined functions
+are serialized and sent over to the TaskManagers for execution.
+
+{% highlight python %}
+class Filter(FilterFunction):
+    def filter(self, value):
+        return value > 5
+
+data_stream.filter(Filter())
+{% endhighlight %}
+
+Rich functions (.e.g `RichFilterFunction`) enable to define (override) the 
optional operations: `open` & `close`.
+The user may use these functions for initialization and cleanups.
+
+{% highlight python %}
+class Tockenizer(RichMapFunction):
+    def open(self, config):
+        pass
+    def close(self):
+        pass
+    def map(self, value):
+        pass
+
+data_stream.map(Tockenizer())
+{% endhighlight %}
+
+The `open` function is called by the worker before starting the streaming 
pipeline.
+The `close` function is called by the worker after the streaming pipeline is 
stopped.
+
+{% top %}
+
+Data Types
+----------
+
+Flink's Python Streaming API offers support for primitive Python types (int, 
float, bool,
+string), as well as byte arrays and user-defined classes.
+
+{% highlight python %}
+class Person:
+    def __init__(self, name, age):
+        self.name = name
+        self.age = age
+
+class Tokenizer(MapFunction):
+    def map(self, value):
+        return (1, Person(*value))
+
+data_stream.map(Tokenizer())
+{% endhighlight %}
+
+#### Tuples/Lists
+
+You can use the tuples (or lists) for composite types. Python tuples are 
mapped to Jython native corresponding
+types, which are handled by the Python wrapper thin layer.
+
+{% highlight python %}
+word_counts = env.from_elements(("hello", 1), ("world",2))
+
+class Tokenizer(MapFunction):
+    def map(self, value):
+        return value[1]
+
+counts = word_counts.map(Tokenizer())
+{% endhighlight %}
+
+{% top %}
+
+Data Sources
+------------
+
+Data sources create the initial data streams, such as from files or from 
collections.
+
+File-based:
+
+- `read_text_file(path)` - reads files line wise and returns them as a stream 
of Strings.
+
+Collection-based:
+
+- `from_elements(*args)` - Creates a data stream from all the elements.
+- `generate_sequence(from, to)` - Generates the sequence of numbers in the 
given interval, in parallel.
+
+**Examples**
+
+{% highlight python %}
+env  = PythonStreamExecutionEnvironment.get_execution_environment()
+
+\# read text file from local files system
+localLiens = env.read_text("file:///path/to/my/textfile")
+
+\# read text file from a HDFS running at nnHost:nnPort
+hdfsLines = env.read_text("hdfs://nnHost:nnPort/path/to/my/textfile")
+
+\# create a set from some given elements
+values = env.from_elements("Foo", "bar", "foobar", "fubar")
+
+\# generate a number sequence
+numbers = env.generate_sequence(1, 10000000)
+{% endhighlight %}
+
+{% top %}
+
+Data Sinks
+----------
+
+Data sinks consume DataStreams and are used to store or return them:
+
+- `write_as_text()` - Writes elements line-wise as Strings. The Strings are
+  obtained by calling the *str()* method of each element.
+- `print()` - Prints the *str()* value of each element on the
+  standard out.
+- `write_to_socket()` - Writes the DataStream to a socket [host:port] as a 
byte array.
+
+A DataStream can be input to multiple operations. Programs can write or print 
a data stream and at the
+same time run additional transformations on them.
+
+**Examples**
+
+Standard data sink methods:
+
+{% highlight scala %}
+ write DataStream to a file on the local file system
+textData.write_as_text("file:///my/result/on/localFS")
+
+ write DataStream to a file on a HDFS with a namenode running at nnHost:nnPort
+textData.write_as_text("hdfs://nnHost:nnPort/my/result/on/localFS")
+
+ write DataStream to a file and overwrite the file if it exists
+textData.write_as_text("file:///my/result/on/localFS", WriteMode.OVERWRITE)
+
+ this writes tuples in the text formatting "(a, b, c)", rather than as CSV 
lines
+values.write_as_text("file:///path/to/the/result/file")
+{% endhighlight %}
+
+{% top %}
+
+Parallel Execution
+------------------
+
+This section describes how the parallel execution of programs can be 
configured in Flink. A Flink
+program consists of multiple tasks (operators, data sources, and sinks). A 
task is split into
+several parallel instances for execution and each parallel instance processes 
a subset of the task's
+input data. The number of parallel instances of a task is called its 
*parallelism* or *degree of
+parallelism (DOP)*.
+
+The degree of parallelism of a task can be specified in Flink on different 
levels.
+
+### Execution Environment Level
+
+Flink programs are executed in the context of an [execution 
environment](#program-skeleton). An
+execution environment defines a default parallelism for all operators, data 
sources, and data sinks
+it executes. Execution environment parallelism can be overwritten by 
explicitly configuring the
+parallelism of an operator.
+
+The default parallelism of an execution environment can be specified by 
calling the
+`set_parallelism()` method. To execute all operators, data sources, and data 
sinks of the
+[WordCount](#example-program) example program with a parallelism of `3`, set 
the default parallelism of the
+execution environment as follows:
+
+{% highlight python %}
+env = PythonStreamExecutionEnvironment.get_execution_environment()
+env.set_parallelism(3)
+
+text.flat_map(Tokenizer()) \
+    .key_by(Selector()) \
+    .time_window(milliseconds(30)) \
+    .reduce(Sum()) \
+    .print()
+
+env.execute()
+{% endhighlight %}
+
+### System Level
+
+A system-wide default parallelism for all execution environments can be 
defined by setting the
+`parallelism.default` property in `./conf/flink-conf.yaml`. See the
+[Configuration]({{ site.baseurl }}/setup/config.html) documentation for 
details.
+
+{% top %}
+
+Executing Plans
+---------------
+
+To run the plan with Flink, go to your Flink distribution, and run the 
pyflink-stream.sh script from the /bin
+folder. The script containing the plan has to be passed as the first argument, 
followed by a number of additional
+Python packages, and finally, separated by `-`additional arguments that will 
be fed to the script.
+
+{% highlight python %}
+./bin/pyflink-stream.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - 
<param1>[ <paramX>]]
+{% endhighlight %}
+
+{% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index daaaa82..c555f66 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -93,6 +93,13 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-python_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-scala-shell_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                </dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-dist/src/main/assemblies/bin.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/bin.xml 
b/flink-dist/src/main/assemblies/bin.xml
index 993e495..059d7be 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -28,7 +28,7 @@ under the License.
        <includeBaseDirectory>true</includeBaseDirectory>
        <baseDirectory>flink-${project.version}</baseDirectory>
 
-       <!-- Include flink-python.jar in lib/ -->
+       <!-- Include flink-python.jar & flink-streaming-python.jar in lib/ -->
        <dependencySets>
                <dependencySet>
                        <outputDirectory>lib</outputDirectory>
@@ -208,12 +208,28 @@ under the License.
                        </includes>
                </fileSet>
 
+               <!-- copy python jar -->
+               <fileSet>
+                       
<directory>../flink-libraries/flink-streaming-python/target</directory>
+                       <outputDirectory>lib</outputDirectory>
+                       <fileMode>0644</fileMode>
+                       <includes>
+                               
<include>flink-python_${scala.binary.version}-${project.version}.jar</include>
+                       </includes>
+               </fileSet>
+
                <!-- copy python example to examples of dist -->
                <fileSet>
                        
<directory>../flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example</directory>
                        <outputDirectory>examples/python</outputDirectory>
                        <fileMode>0755</fileMode>
                </fileSet>
+               <!-- copy python streaming example to examples of dist -->
+               <fileSet>
+                       
<directory>../flink-examples/flink-examples-streaming/src/main/python</directory>
+                       
<outputDirectory>examples/streaming/python</outputDirectory>
+                       <fileMode>0755</fileMode>
+               </fileSet>
 
        </fileSets>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-dist/src/main/flink-bin/bin/pyflink-stream.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/pyflink-stream.sh 
b/flink-dist/src/main/flink-bin/bin/pyflink-stream.sh
new file mode 100644
index 0000000..c31f702
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/bin/pyflink-stream.sh
@@ -0,0 +1,25 @@
+#!/bin/bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+"$FLINK_BIN_DIR"/flink run --class 
org.apache.flink.streaming.python.api.PythonStreamBinder -v 
"$FLINK_ROOT_DIR"/lib/flink-streaming-python*.jar "$@"

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-examples/flink-examples-streaming/src/main/python/fibonacci.py
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/python/fibonacci.py 
b/flink-examples/flink-examples-streaming/src/main/python/fibonacci.py
new file mode 100644
index 0000000..96f90c8
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/python/fibonacci.py
@@ -0,0 +1,134 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+#
+# An example that illustrates iterations in Flink streaming. The program sums 
up random numbers and counts
+# additions. The operation is done until it reaches a specific threshold in an 
iterative streaming fashion.
+#
+
+import random
+import sys
+from org.apache.flink.api.common.functions import MapFunction
+from org.apache.flink.streaming.api.collector.selector import OutputSelector
+from org.apache.flink.streaming.api.functions.source import SourceFunction
+from org.apache.flink.api.java.utils import ParameterTool
+from org.apache.flink.streaming.python.api.environment import 
PythonStreamExecutionEnvironment
+
+TARGET_VAL = 100
+MAX_INT_START = 50
+
+
+class Generator(SourceFunction):
+    def __init__(self, num_iters=1000):
+        self._running = True
+        self._num_iters = num_iters
+
+    def run(self, ctx):
+        counter = 0
+        while self._running and counter < self._num_iters:
+            self.do(ctx)
+            counter += 1
+
+    def do(self, ctx):
+        two_numbers = "{}, {}".format(random.randrange(1, MAX_INT_START), 
random.randrange(1, MAX_INT_START))
+        ctx.collect(two_numbers)
+
+    def cancel(self):
+        self._running = False
+
+
+class Fib(MapFunction):
+    def map(self, value):
+        return (value[0], value[1], value[3], value[2] + value[3], value[4] + 
1)
+
+
+class InPut(MapFunction):
+    def map(self, value):
+        num1, num2 = value.split(",")
+
+        num1 = int(num1.strip())
+        num2 = int(num2.strip())
+
+        return (num1, num2, num1, num2, 0)
+
+
+class OutPut(MapFunction):
+    def map(self, value):
+        return ((value[0], value[1]), value[4])
+
+
+class StreamSelector(OutputSelector):
+    def select(self, value):
+        return ["iterate"] if value[3] < TARGET_VAL else ["output"]
+
+
+class Main:
+    def __init__(self, args):
+        self._args = args
+
+    def run(self):
+        _params = ParameterTool.fromArgs(self._args)
+
+        env = PythonStreamExecutionEnvironment.get_execution_environment()
+
+        # create input stream of integer pairs
+        if "--input" in self._args:
+            try:
+                file_path = _params.get("input")
+                input_stream = env.read_text_file(file_path)
+            except Exception as e:
+                print(e)
+                print ("Error in reading input file. Exiting...")
+                sys.exit(5)
+        else:
+            input_stream = env.create_python_source(Generator(num_iters=50))
+
+        # create an iterative data stream from the input with 5 second timeout
+        it = input_stream.map(InPut()).iterate(5000)
+
+        # apply the step function to get the next Fibonacci number
+        # increment the counter and split the output with the output selector
+        step = it.map(Fib()).split(StreamSelector())
+        it.close_with(step.select("iterate"))
+
+        # to produce the final output select the tuples directed to the
+        # 'output' channel then get the input pairs that have the greatest 
iteration counter
+        # on a 1 second sliding window
+        output = step.select("output")
+        parsed_output = output.map(OutPut())
+        if "--output" in self._args:
+            try:
+                file_path = _params.get("output")
+                parsed_output.write_as_text(file_path)
+            except Execption as e:
+                print (e)
+                print ("Error in writing to output file. will print instead.")
+                parsed_output.print()
+        else:
+            parsed_output.print()
+        result = env.execute("Fibonacci Example (py)", True if 
_params.has("local") else False)
+        print("Fibonacci job completed, job_id={}".format(result.jobID))
+
+
+def main():
+    argv = sys.argv[1:] if len(sys.argv) > 1 else []
+    Main(argv).run()
+
+
+if __name__ == '__main__':
+    main()

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-streaming-python/pom.xml 
b/flink-libraries/flink-streaming-python/pom.xml
new file mode 100644
index 0000000..68e205a
--- /dev/null
+++ b/flink-libraries/flink-streaming-python/pom.xml
@@ -0,0 +1,104 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-libraries</artifactId>
+               <version>1.4-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-streaming-python_${scala.binary.version}</artifactId>
+       <name>flink-streaming-python</name>
+       <packaging>jar</packaging>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-jar-plugin</artifactId>
+                               <configuration>
+                                       <descriptorRefs>
+                                               
<descriptorRef>jar-with-dependencies</descriptorRef>
+                                       </descriptorRefs>
+                                       <archive>
+                                               <manifest>
+                                                       
<addClasspath>true</addClasspath>
+                                                       
<mainClass>org.apache.flink.streaming.python.api.PythonStreamBinder</mainClass>
+                                               </manifest>
+                                       </archive>
+                               </configuration>
+                       </plugin>
+               </plugins>
+       </build>
+
+       <dependencies>
+
+               <!-- core dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+            <scope>provided</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-java</artifactId>
+                       <version>${project.version}</version>
+            <scope>provided</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+            <scope>provided</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+            <scope>provided</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.python</groupId>
+                       <artifactId>jython-standalone</artifactId>
+                       <version>2.7.0</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
+                       <version>1.2.0</version>
+            <scope>provided</scope>
+               </dependency>
+
+               <!-- test dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/PythonStreamBinder.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/PythonStreamBinder.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/PythonStreamBinder.java
new file mode 100644
index 0000000..ad5f432
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/PythonStreamBinder.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import 
org.apache.flink.streaming.python.api.environment.PythonEnvironmentConfig;
+import org.apache.flink.streaming.python.api.functions.UtilityFunctions;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.PrintWriter;
+import java.io.IOException;
+
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.Random;
+
+/**
+ * Allows the execution of Flink stream plan that is written in Python
+ */
+public class PythonStreamBinder {
+       private static final Random r = new Random(System.currentTimeMillis());
+       public static final String FLINK_PYTHON_FILE_PATH = 
System.getProperty("java.io.tmpdir") + File.separator + "flink_streaming_plan_";
+
+
+       private PythonStreamBinder() {
+       }
+
+       /**
+        * Entry point for the execution of a python streaming task.
+        *
+        * @param args <pathToScript> [<pathToPackage1> .. [<pathToPackageX]] - 
[parameter1]..[parameterX]
+        * @throws Exception
+        */
+       public static void main(String[] args) throws Exception {
+               PythonStreamBinder binder = new PythonStreamBinder();
+               binder.runPlan(args);
+       }
+
+       private void runPlan(String[] args) throws Exception {
+               File script;
+               if (args.length < 1) {
+                       System.out.println("Usage: prog <pathToScript> 
[<pathToPackage1> .. [<pathToPackageX]] - [parameter1]..[parameterX]");
+                       return;
+               }
+               else {
+                       script = new File(args[0]);
+                       if ((!script.exists()) || (!script.isFile()))
+                       {
+                               throw new FileNotFoundException("Could not find 
file: " + args[0]);
+                       }
+               }
+
+               int split = 0;
+               for (int x = 0; x < args.length; x++) {
+                       if (args[x].compareTo("-") == 0) {
+                               split = x;
+                       }
+               }
+
+               GlobalConfiguration.loadConfiguration();
+
+               String tmpPath = FLINK_PYTHON_FILE_PATH + r.nextLong();
+               prepareFiles(tmpPath, Arrays.copyOfRange(args, 0, split == 0 ? 
args.length : split));
+
+               if (split != 0) {
+                       String[] a = new String[args.length - split];
+                       a[0] = args[0];
+                       System.arraycopy(args, split + 1, a, 1, args.length - 
(split +1));
+                       args = a;
+               } else if (args.length > 1) {
+                       args = new String[]{args[0]};
+               }
+
+               UtilityFunctions.initAndExecPythonScript(new File(
+                       tmpPath + File.separator + 
PythonEnvironmentConfig.FLINK_PYTHON_PLAN_NAME), args);
+       }
+
+       /**
+        * Prepares a single package from the given python script. This 
involves in generating a unique main
+        * python script, which loads the user's provided script for execution. 
The reason for this is to create
+        * a different namespace for different script contents. In addition, it 
copies all relevant files
+        * to a temporary folder for execution and distribution.
+        *
+        * @param tempFilePath The path to copy all the files to
+        * @param filePaths The user's script and dependent packages/files
+        * @throws IOException
+        * @throws URISyntaxException
+        */
+       private void prepareFiles(String tempFilePath, String... filePaths) 
throws IOException, URISyntaxException,
+               NoSuchAlgorithmException {
+               PythonEnvironmentConfig.pythonTmpCachePath = tempFilePath;
+
+               Files.createDirectories(Paths.get(tempFilePath));
+
+               String dstMainScriptFullPath = tempFilePath + File.separator + 
FilenameUtils.getBaseName(filePaths[0]) +
+                       calcPythonMd5(filePaths[0]) + ".py";
+
+               generateAndCopyPlanFile(tempFilePath, dstMainScriptFullPath);
+
+               //main script
+               copyFile(filePaths[0], tempFilePath, 
FilenameUtils.getName(dstMainScriptFullPath));
+
+               String parentDir = new File(filePaths[0]).getParent();
+
+               //additional files/folders(modules)
+               for (int x = 1; x < filePaths.length; x++) {
+                       String currentParent = (new 
File(filePaths[x])).getParent();
+                       if (currentParent.startsWith(".")) {
+                               filePaths[x] = parentDir + File.separator + 
filePaths[x];
+                       }
+                       copyFile(filePaths[x], tempFilePath, null);
+               }
+       }
+
+       private void generateAndCopyPlanFile(String dstPath, String 
mainScriptFullPath) throws IOException {
+               String moduleName = 
FilenameUtils.getBaseName(mainScriptFullPath);
+
+               FileWriter fileWriter = new FileWriter(dstPath + File.separator 
+ PythonEnvironmentConfig.FLINK_PYTHON_PLAN_NAME);
+               PrintWriter printWriter = new PrintWriter(fileWriter);
+               printWriter.printf("import %s\n\n", moduleName);
+               printWriter.printf("%s.main()\n", moduleName);
+               printWriter.close();
+       }
+
+       private void copyFile(String path, String target, String name) throws 
IOException, URISyntaxException {
+               if (path.endsWith(File.separator)) {
+                       path = path.substring(0, path.length() - 1);
+               }
+               String identifier = name == null ? 
path.substring(path.lastIndexOf(File.separator)) : name;
+               String tmpFilePath = target + File.separator + identifier;
+               Path p = new Path(path);
+               FileCache.copy(p.makeQualified(FileSystem.get(p.toUri())), new 
Path(tmpFilePath), true);
+       }
+
+
+       /**
+        * Naive MD5 calculation from the python content of the given script. 
Spaces, blank lines and comments
+        * are ignored.
+        *
+        * @param filePath  the full path of the given python script
+        * @return  the md5 value as a string
+        * @throws NoSuchAlgorithmException
+        * @throws IOException
+        */
+       private String calcPythonMd5(String filePath) throws 
NoSuchAlgorithmException, IOException {
+               FileReader fileReader = new FileReader(filePath);
+               BufferedReader bufferedReader = new BufferedReader(fileReader);
+
+               String line;
+               MessageDigest md = MessageDigest.getInstance("MD5");
+               while ((line = bufferedReader.readLine()) != null) {
+                       line = line.trim();
+                       if (line.isEmpty() || line.startsWith("#")) {
+                               continue;
+                       }
+                       byte[] bytes = line.getBytes();
+                       md.update(bytes, 0, bytes.length);
+               }
+               fileReader.close();
+
+               byte[] mdBytes = md.digest();
+
+               //convert the byte to hex format method 1
+               StringBuffer sb = new StringBuffer();
+               for (int i = 0; i < mdBytes.length; i++) {
+                       sb.append(Integer.toString((mdBytes[i] & 0xff) + 0x100, 
16).substring(1));
+               }
+               return sb.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonDataStream.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonDataStream.java
new file mode 100644
index 0000000..96feb0b
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonDataStream.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.datastream;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.python.api.functions.PyKey;
+import org.apache.flink.streaming.python.api.functions.PythonFilterFunction;
+import org.apache.flink.streaming.python.api.functions.PythonFlatMapFunction;
+import org.apache.flink.streaming.python.api.functions.PythonKeySelector;
+import org.apache.flink.streaming.python.api.functions.PythonMapFunction;
+import org.apache.flink.streaming.python.api.functions.PythonOutputSelector;
+import org.apache.flink.streaming.python.api.functions.PythonSinkFunction;
+import 
org.apache.flink.streaming.python.util.serialization.PythonSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+
+/**
+ * A {@code PythonDataStream} is a thin wrapper layer over {@link DataStream}, 
which represents a
+ * stream of elements of the same type. A {@code PythonDataStream} can be 
transformed into
+ * another {@code PythonDataStream} by applying various transformation 
functions, such as
+ * <ul>
+ * <li>{@link PythonDataStream#map}
+ * <li>{@link PythonDataStream#split}
+ * </ul>
+ *
+ * <p>A thin wrapper layer means that the functionality itself is performed by 
the
+ * {@link DataStream}, however instead of working directly with the streaming 
data sets,
+ * this layer handles Python wrappers (e.g. {@code PythonDataStream}) to 
comply with the
+ * Python standard coding styles.</p>
+ */
+@PublicEvolving
+public class PythonDataStream<D extends DataStream<PyObject>> {
+       protected final D stream;
+
+       public PythonDataStream(D stream) {
+               this.stream = stream;
+       }
+
+       /**
+        * A thin wrapper layer over {@link DataStream#union(DataStream[])}.
+        *
+        * @param streams
+        *            The Python DataStreams to union output with.
+        * @return The {@link PythonDataStream}.
+        */
+       @SafeVarargs
+       @SuppressWarnings("unchecked")
+       public final PythonDataStream union(PythonDataStream... streams) {
+               ArrayList<DataStream<PyObject>> dsList = new ArrayList<>();
+               for (PythonDataStream ps : streams) {
+                       dsList.add(ps.stream);
+               }
+               DataStream<PyObject>[] dsArray = new DataStream[dsList.size()];
+               return new 
PythonDataStream(stream.union(dsList.toArray(dsArray)));
+       }
+
+       /**
+        * A thin wrapper layer over {@link DataStream#split(OutputSelector)}.
+        *
+        * @param output_selector
+        *            The user defined {@link OutputSelector} for directing the 
tuples.
+        * @return The {@link PythonSplitStream}
+        */
+       public PythonSplitStream split(OutputSelector<PyObject> 
output_selector) throws IOException {
+               return new PythonSplitStream(this.stream.split(new 
PythonOutputSelector(output_selector)));
+       }
+
+       /**
+        * A thin wrapper layer over {@link DataStream#filter(FilterFunction)}.
+        *
+        * @param filter
+        *            The FilterFunction that is called for each element of the 
DataStream.
+        * @return The filtered {@link PythonDataStream}.
+        */
+       public PythonSingleOutputStreamOperator filter(FilterFunction<PyObject> 
filter) throws IOException {
+               return new PythonSingleOutputStreamOperator(stream.filter(new 
PythonFilterFunction(filter)));
+       }
+
+       /**
+        * A thin wrapper layer over {@link DataStream#map(MapFunction)}.
+        *
+        * @param mapper
+        *            The MapFunction that is called for each element of the
+        *            DataStream.
+        * @return The transformed {@link PythonDataStream}.
+        */
+       public PythonDataStream<SingleOutputStreamOperator<PyObject>> map(
+               MapFunction<PyObject, PyObject> mapper) throws IOException {
+               return new PythonDataStream<>(stream.map(new 
PythonMapFunction(mapper)));
+       }
+
+       /**
+        * A thin wrapper layer over {@link 
DataStream#flatMap(FlatMapFunction)}.
+        *
+        * @param flat_mapper
+        *            The FlatMapFunction that is called for each element of the
+        *            DataStream
+        *
+        * @return The transformed {@link PythonDataStream}.
+        */
+       public PythonDataStream<SingleOutputStreamOperator<PyObject>> flat_map(
+               FlatMapFunction<PyObject, PyObject> flat_mapper) throws 
IOException {
+               return new PythonDataStream<>(stream.flatMap(new 
PythonFlatMapFunction(flat_mapper)));
+       }
+
+       /**
+        * A thin wrapper layer over {@link DataStream#keyBy(KeySelector)}.
+        *
+        * @param selector
+        *            The KeySelector to be used for extracting the key for 
partitioning
+        * @return The {@link PythonDataStream} with partitioned state (i.e. 
{@link PythonKeyedStream})
+        */
+       public PythonKeyedStream key_by(KeySelector<PyObject, PyKey> selector) 
throws IOException {
+               return new PythonKeyedStream(stream.keyBy(new 
PythonKeySelector(selector)));
+       }
+
+       /**
+        * A thin wrapper layer over {@link DataStream#print()}.
+        */
+       @PublicEvolving
+       public void print() {
+               stream.print();
+       }
+
+       /**
+        * A thin wrapper layer over {@link 
DataStream#writeAsText(java.lang.String)}.
+        *
+        * @param path
+        *            The path pointing to the location the text file is 
written to.
+        */
+       @PublicEvolving
+       public void write_as_text(String path) {
+               stream.writeAsText(path);
+       }
+
+       /**
+        * A thin wrapper layer over {@link 
DataStream#writeAsText(java.lang.String, WriteMode)}.
+        *
+        * @param path
+        *            The path pointing to the location the text file is 
written to
+        * @param mode
+        *            Controls the behavior for existing files. Options are
+        *            NO_OVERWRITE and OVERWRITE.
+        */
+       @PublicEvolving
+       public void write_as_text(String path, WriteMode mode) {
+               stream.writeAsText(path, mode);
+       }
+
+       /**
+        * A thin wrapper layer over {@link 
DataStream#writeToSocket(java.lang.String, int, SerializationSchema)}
+        *
+        * @param host
+        *            host of the socket
+        * @param port
+        *            port of the socket
+        * @param schema
+        *            schema for serialization
+        */
+       @PublicEvolving
+       public void write_to_socket(String host, Integer port, 
SerializationSchema<PyObject> schema) throws IOException {
+               stream.writeToSocket(host, port, new 
PythonSerializationSchema(schema));
+       }
+
+       /**
+        * A thin wrapper layer over {@link DataStream#addSink(SinkFunction)}
+        *
+        * @param sink_func
+        *            The object containing the sink's invoke function.
+        */
+       @PublicEvolving
+       public void add_sink(SinkFunction<PyObject> sink_func) throws 
IOException {
+               stream.addSink(new PythonSinkFunction(sink_func));
+       }
+
+       /**
+        * A thin wrapper layer over {@link DataStream#iterate()}.
+        *
+        * <p>Initiates an iterative part of the program that feeds back data 
streams.
+        * The iterative part needs to be closed by calling
+        * {@link PythonIterativeStream#close_with(PythonDataStream)}. The 
transformation of
+        * this IterativeStream will be the iteration head. The data stream
+        * given to the {@link 
PythonIterativeStream#close_with(PythonDataStream)} method is
+        * the data stream that will be fed back and used as the input for the
+        * iteration head. </p>
+        * <p>
+        * A common usage pattern for streaming iterations is to use output
+        * splitting to send a part of the closing data stream to the head. 
Refer to
+        * {@link #split(OutputSelector)} for more information.
+        * <p>
+        * The iteration edge will be partitioned the same way as the first 
input of
+        * the iteration head unless it is changed in the
+        * {@link PythonIterativeStream#close_with(PythonDataStream)} call.
+        * <p>
+        * By default a PythonDataStream with iteration will never terminate, 
but the user
+        * can use the maxWaitTime parameter to set a max waiting time for the
+        * iteration head. If no data received in the set time, the stream
+        * terminates.
+        *
+        * @return The iterative data stream created.
+        */
+       @PublicEvolving
+       public PythonIterativeStream iterate() {
+               return new PythonIterativeStream(this.stream.iterate());
+       }
+
+       /**
+        * A thin wrapper layer over {@link DataStream#iterate(long)}.
+        *
+        * <p></p>Initiates an iterative part of the program that feeds back 
data streams.
+        * The iterative part needs to be closed by calling
+        * {@link PythonIterativeStream#close_with(PythonDataStream)}. The 
transformation of
+        * this IterativeStream will be the iteration head. The data stream
+        * given to the {@link 
PythonIterativeStream#close_with(PythonDataStream)} method is
+        * the data stream that will be fed back and used as the input for the
+        * iteration head.</p>
+        * <p>
+        * A common usage pattern for streaming iterations is to use output
+        * splitting to send a part of the closing data stream to the head. 
Refer to
+        * {@link #split(OutputSelector)} for more information.
+        * <p>
+        * The iteration edge will be partitioned the same way as the first 
input of
+        * the iteration head unless it is changed in the
+        * {@link PythonIterativeStream#close_with(PythonDataStream)} call.
+        * <p>
+        * By default a PythonDataStream with iteration will never terminate, 
but the user
+        * can use the maxWaitTime parameter to set a max waiting time for the
+        * iteration head. If no data received in the set time, the stream
+        * terminates.
+        *
+        * @param max_wait_time_ms
+        *          Number of milliseconds to wait between inputs before 
shutting
+        *          down
+        *
+        * @return The iterative data stream created.
+        */
+       @PublicEvolving
+       public PythonIterativeStream iterate(Long max_wait_time_ms) {
+               return new 
PythonIterativeStream(this.stream.iterate(max_wait_time_ms));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonIterativeStream.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonIterativeStream.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonIterativeStream.java
new file mode 100644
index 0000000..ced1c24
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonIterativeStream.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.datastream;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.IterativeStream;
+import org.python.core.PyObject;
+
+/**
+ * A thin wrapper layer over {@link IterativeStream}.
+ *
+ * <p>The python iterative data stream represents the start of an iteration in 
a
+ * {@link PythonDataStream}.</p>
+ */
+@PublicEvolving
+public class PythonIterativeStream extends PythonSingleOutputStreamOperator {
+
+       public PythonIterativeStream(IterativeStream<PyObject> iterativeStream) 
{
+               super(iterativeStream);
+       }
+
+       /**
+        * A thin wrapper layer over {@link 
IterativeStream#closeWith(org.apache.flink.streaming.api.datastream.DataStream)}
+        *
+        * Please note that this function works with {@link PythonDataStream} 
and thus wherever a DataStream is mentioned in
+        * the above {@link 
IterativeStream#closeWith(org.apache.flink.streaming.api.datastream.DataStream)}
 description,
+        * the user may regard it as {@link PythonDataStream} .
+        *
+ *
+        * @param feedback_stream
+        *            {@link PythonDataStream} that will be used as input to 
the iteration
+        *            head.
+        *
+        * @return The feedback stream.
+        *
+        */
+       public PythonDataStream close_with(PythonDataStream<? extends 
DataStream<PyObject>> feedback_stream) {
+               
((IterativeStream<PyObject>)this.stream).closeWith(feedback_stream.stream);
+               return feedback_stream;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonKeyedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonKeyedStream.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonKeyedStream.java
new file mode 100644
index 0000000..f259374
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonKeyedStream.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.datastream;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.python.api.functions.PyKey;
+import org.apache.flink.streaming.python.api.functions.PythonReduceFunction;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+
+/**
+ * A thin wrapper layer over {@link KeyedStream}.
+ *
+ * <p>A {@code PythonKeyedStream} represents a {@link PythonDataStream} on 
which operator state is
+ * partitioned by key using a provided {@link 
org.apache.flink.api.java.functions.KeySelector;}</p>
+ */
+@Public
+public class PythonKeyedStream extends PythonDataStream<KeyedStream<PyObject, 
PyKey>> {
+
+       public PythonKeyedStream(KeyedStream<PyObject, PyKey> stream) {
+               super(stream);
+       }
+
+       /**
+        * A thin wrapper layer over {@link KeyedStream#countWindow(long, 
long)}.
+        *
+        * @param size The size of the windows in number of elements.
+        * @param slide The slide interval in number of elements.
+        * @return The python windowed stream {@link PythonWindowedStream}
+        */
+       public PythonWindowedStream count_window(long size, long slide) {
+               return new 
PythonWindowedStream<GlobalWindow>(this.stream.countWindow(size, slide));
+       }
+
+       /**
+        * A thin wrapper layer over {@link KeyedStream#timeWindow(Time)}
+        *
+        * @param size The size of the window.
+        * @return The python windowed stream {@link PythonWindowedStream}
+        */
+       public PythonWindowedStream time_window(Time size) {
+               return new 
PythonWindowedStream<TimeWindow>(this.stream.timeWindow(size));
+       }
+
+       /**
+        * A thin wrapper layer over {@link KeyedStream#timeWindow(Time, Time)}
+        *
+        * @param size The size of the window.
+        * @return The python wrapper {@link PythonWindowedStream}
+        */
+       public PythonWindowedStream time_window(Time size, Time slide) {
+               return new 
PythonWindowedStream<TimeWindow>(this.stream.timeWindow(size, slide));
+       }
+
+       /**
+        * A thin wrapper layer over {@link KeyedStream#reduce(ReduceFunction)}
+        *
+        * @param reducer
+        *            The {@link ReduceFunction} that will be called for every
+        *            element of the input values with the same key.
+        * @return The transformed data stream @{link 
PythonSingleOutputStreamOperator}.
+        */
+       public PythonSingleOutputStreamOperator reduce(ReduceFunction<PyObject> 
reducer) throws IOException {
+               return new 
PythonSingleOutputStreamOperator(this.stream.reduce(new 
PythonReduceFunction(reducer)));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonObjectInputStream2.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonObjectInputStream2.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonObjectInputStream2.java
new file mode 100644
index 0000000..f701d98
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonObjectInputStream2.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.python.api.datastream;
+
+import org.python.util.PythonObjectInputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectStreamClass;
+
+/**
+ * A helper class to overcome the inability to set the serialVersionUID in a 
python user-defined
+ * function (UDF).
+ * <p>The fact the this field is not set, results in a dynamic calculation of 
this serialVersionUID,
+ * using SHA, to make sure it is a unique number. This unique number is a 
64-bit hash of the
+ * class name, interface class names, methods, and fields. If a Python class 
inherits from a
+ * Java class, as in the case of Python UDFs, then a proxy wrapper class is 
created. Its name is
+ * constructed using the following pattern:
+ * <b>{@code org.python.proxies.<module-name>$<UDF-class-name>$<number>}</b>. 
The {@code <number>}
+ * part is increased by one in runtime, for every job submission. It results 
in different serial
+ * version UID for each run for the same Python class. Therefore, it is 
required to silently
+ * suppress the serial version UID mismatch check.</p>
+ */
+public class PythonObjectInputStream2 extends PythonObjectInputStream {
+
+       public PythonObjectInputStream2(InputStream in) throws IOException {
+               super(in);
+       }
+
+       protected ObjectStreamClass readClassDescriptor() throws IOException, 
ClassNotFoundException {
+               ObjectStreamClass resultClassDescriptor = 
super.readClassDescriptor(); // initially streams descriptor
+
+               Class<?> localClass;
+               try {
+                       localClass = resolveClass(resultClassDescriptor);
+               } catch (ClassNotFoundException e) {
+                       System.out.println("No local class for " + 
resultClassDescriptor.getName());
+                       return resultClassDescriptor;
+               }
+
+               ObjectStreamClass localClassDescriptor = 
ObjectStreamClass.lookup(localClass);
+               if (localClassDescriptor != null) { // only if class implements 
serializable
+                       final long localSUID = 
localClassDescriptor.getSerialVersionUID();
+                       final long streamSUID = 
resultClassDescriptor.getSerialVersionUID();
+                       if (streamSUID != localSUID) { // check for 
serialVersionUID mismatch.
+                               // Overriding serialized class version mismatch
+                               resultClassDescriptor = localClassDescriptor; 
// Use local class descriptor for deserialization
+                       }
+               }
+               return resultClassDescriptor;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonSingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonSingleOutputStreamOperator.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonSingleOutputStreamOperator.java
new file mode 100644
index 0000000..add81f7
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonSingleOutputStreamOperator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.datastream;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.python.core.PyObject;
+
+
+/**
+ * A thin wrapper layer over {@link SingleOutputStreamOperator}
+ *
+ * <p>{@code PythonSingleOutputStreamOperator} represents a user defined 
transformation
+ * applied on a {@link PythonDataStream} with one predefined output type.</p>
+ */
+@Public
+public class PythonSingleOutputStreamOperator extends 
PythonDataStream<SingleOutputStreamOperator<PyObject>> {
+       public 
PythonSingleOutputStreamOperator(SingleOutputStreamOperator<PyObject> stream) {
+               super(stream);
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonSplitStream.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonSplitStream.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonSplitStream.java
new file mode 100644
index 0000000..e7621ca
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonSplitStream.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.datastream;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.datastream.SplitStream;
+import org.python.core.PyObject;
+
+
+/**
+ * A thin wrapper layer over {@link SplitStream}.
+ *
+ * <p>The {@code PythonSplitStream} represents an operator that has been split 
using an
+ * {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}. 
Named outputs
+ * can be selected using the {@link #select} function. To apply transformation 
on the whole
+ * output simply call the transformation on the {@code PythonSplitStream}</p>
+ */
+@PublicEvolving
+public class PythonSplitStream extends PythonDataStream<SplitStream<PyObject>> 
{
+
+       public PythonSplitStream(SplitStream<PyObject> splitStream) {
+               super(splitStream);
+       }
+
+       /**
+        * A thin wrapper layer over {@link 
SplitStream#select(java.lang.String...)}
+        *
+        * @param output_names
+        *            The output names for which the operator will receive the
+        *            input.
+        * @return Returns the selected {@link PythonDataStream}
+        */
+       public PythonDataStream select(String... output_names) {
+               return new PythonDataStream<>(this.stream.select(output_names));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonWindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonWindowedStream.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonWindowedStream.java
new file mode 100644
index 0000000..563661e
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonWindowedStream.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.python.api.datastream;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.api.datastream.WindowedStream;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.python.api.functions.PyKey;
+import org.apache.flink.streaming.python.api.functions.PythonApplyFunction;
+import org.apache.flink.streaming.python.api.functions.PythonReduceFunction;
+import org.python.core.PyObject;
+
+import java.io.IOException;
+
+/**
+ * A thin wrapper layer over {@link WindowedStream}.
+ *
+ * <p>A {@code PythonWindowedStream} represents a data stream where elements 
are grouped by
+ * key, and for each key, the stream of elements is split into windows based 
on a
+ * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. 
Window emission
+ * is triggered based on a {@link 
org.apache.flink.streaming.api.windowing.triggers.Trigger}.</p>
+ */
+@Public
+public class PythonWindowedStream<W extends Window> {
+       private final WindowedStream<PyObject, PyKey, W> stream;
+
+       public PythonWindowedStream(WindowedStream<PyObject, PyKey, W> stream) {
+               this.stream = stream;
+       }
+
+       /**
+        * A thin wrapper layer over {@link 
WindowedStream#reduce(org.apache.flink.api.common.functions.ReduceFunction)}.
+        *
+        * @param fun The reduce function.
+        * @return The data stream that is the result of applying the reduce 
function to the window.
+        */
+       @SuppressWarnings("unchecked")
+       public PythonSingleOutputStreamOperator reduce(ReduceFunction<PyObject> 
fun) throws IOException {
+               return new PythonSingleOutputStreamOperator(stream.reduce(new 
PythonReduceFunction(fun)));
+       }
+
+       /**
+        * A thin wrapper layer over {@link 
WindowedStream#apply(WindowFunction)}.
+        *
+        * @param fun The window function.
+        * @return The data stream that is the result of applying the window 
function to the window.
+        */
+       public PythonSingleOutputStreamOperator apply(
+               WindowFunction<PyObject, PyObject, Object, W> fun) throws 
IOException {
+               return new PythonSingleOutputStreamOperator(stream.apply(new 
PythonApplyFunction<>(fun)));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00284fb8/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonEnvironmentConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonEnvironmentConfig.java
 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonEnvironmentConfig.java
new file mode 100644
index 0000000..2031af1
--- /dev/null
+++ 
b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonEnvironmentConfig.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.python.api.environment;
+
+
+public class PythonEnvironmentConfig {
+       public static final String FLINK_PYTHON_DC_ID = "flink";
+
+       public static final String FLINK_PYTHON_PLAN_NAME = "plan.py";
+
+       /**
+        * Holds the path for the local python files cache. Is is set only on 
the client side by
+        * the python streaming plan binder.
+        */
+       public static String pythonTmpCachePath;
+
+       /**
+        * Holds the path in the shared storage at which the python script(s) 
reside. It is set on the client side
+        * within the execution process.
+        */
+       public static String FLINK_HDFS_PATH = "hdfs:///tmp/flink"; // 
"file:/tmp/flink"
+
+}

Reply via email to