Repository: flink
Updated Branches:
  refs/heads/master e04239dd7 -> 93c61c097


[FLINK-3626] [py] Add zipWithIndex()

This closes #2136

Create a task_id message in PythonStreamer that is passed through
to the underlying process and included in the RuntimeContext, where
it is accessible to the user and to functions.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/93c61c09
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/93c61c09
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/93c61c09

Branch: refs/heads/master
Commit: 93c61c097a889c55f412ff31524c749851ca872f
Parents: e04239d
Author: Shannon Quinn <mag...@gmail.com>
Authored: Wed Mar 16 17:11:08 2016 -0400
Committer: zentol <ches...@apache.org>
Committed: Wed Jun 22 12:05:20 2016 +0200

----------------------------------------------------------------------
 docs/apis/batch/python.md                       | 10 +++++
 docs/apis/batch/zip_elements_guide.md           | 19 ++++++++-
 .../api/streaming/data/PythonStreamer.java      |  1 +
 .../api/flink/functions/CoGroupFunction.py      |  6 +--
 .../python/api/flink/functions/Function.py      |  4 +-
 .../api/flink/functions/GroupReduceFunction.py  |  4 +-
 .../api/flink/functions/ReduceFunction.py       |  4 +-
 .../api/flink/functions/RuntimeContext.py       |  6 ++-
 .../flink/python/api/flink/plan/DataSet.py      | 44 +++++++++++++++++++-
 .../flink/python/api/flink/plan/Environment.py  |  3 +-
 .../org/apache/flink/python/api/test_main.py    | 11 ++++-
 11 files changed, 96 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/93c61c09/docs/apis/batch/python.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/python.md b/docs/apis/batch/python.md
index 0f55124..9158dfb 100644
--- a/docs/apis/batch/python.md
+++ b/docs/apis/batch/python.md
@@ -312,6 +312,16 @@ data.union(data2)
 {% endhighlight %}
       </td>
     </tr>
+    <tr>
+      <td><strong>ZipWithIndex</strong></td>
+      <td>
+        <p>Assigns consecutive indexes to each element. For more information, 
please refer to
+        the [Zip Elements 
Guide](zip_elements_guide.html#zip-with-a-dense-index).</p>
+{% highlight python %}
+data.zip_with_index()
+{% endhighlight %}
+      </td>
+    </tr>
   </tbody>
 </table>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/93c61c09/docs/apis/batch/zip_elements_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/zip_elements_guide.md 
b/docs/apis/batch/zip_elements_guide.md
index 59f723a..e3e93b5 100644
--- a/docs/apis/batch/zip_elements_guide.md
+++ b/docs/apis/batch/zip_elements_guide.md
@@ -34,7 +34,7 @@ This document shows how {% gh_link 
/flink-java/src/main/java/org/apache/flink/ap
 ### Zip with a Dense Index
 `zipWithIndex` assigns consecutive labels to the elements, receiving a data 
set as input and returning a new data set of `(unique id, initial value)` 
2-tuples.
 This process requires two passes, first counting then labeling elements, and 
cannot be pipelined due to the synchronization of counts.
-The alternative `zipWIthUniqueId` works in a pipelined fashion and is 
preferred when a unique labeling is sufficient.
+The alternative `zipWithUniqueId` works in a pipelined fashion and is 
preferred when a unique labeling is sufficient.
 For example, the following code:
 
 <div class="codetabs" markdown="1">
@@ -66,6 +66,21 @@ env.execute()
 {% endhighlight %}
 </div>
 
+<div data-lang="scala" markdown="1">
+{% highlight python %}
+from flink.plan.Environment import get_environment
+
+env = get_environment()
+env.set_parallelism(2)
+input = env.from_elements("A", "B", "C", "D", "E", "F", "G", "H")
+
+result = input.zipWithIndex()
+
+result.write_text(result_path)
+env.execute()
+{% endhighlight %}
+</div>
+
 </div>
 
 may yield the tuples: (0,G), (1,H), (2,A), (3,B), (4,C), (5,D), (6,E), (7,F)
@@ -74,7 +89,7 @@ may yield the tuples: (0,G), (1,H), (2,A), (3,B), (4,C), 
(5,D), (6,E), (7,F)
 
 ### Zip with a Unique Identifier
 In many cases one may not need to assign consecutive labels.
-`zipWIthUniqueId` works in a pipelined fashion, speeding up the label 
assignment process. This method receives a data set as input and returns a new 
data set of `(unique id, initial value)` 2-tuples.
+`zipWithUniqueId` works in a pipelined fashion, speeding up the label 
assignment process. This method receives a data set as input and returns a new 
data set of `(unique id, initial value)` 2-tuples.
 For example, the following code:
 
 <div class="codetabs" markdown="1">

http://git-wip-us.apache.org/repos/asf/flink/blob/93c61c09/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index e67099e..10aded8 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -130,6 +130,7 @@ public class PythonStreamer implements Serializable {
                processOutput.write("operator\n".getBytes());
                processOutput.write(("" + server.getLocalPort() + 
"\n").getBytes());
                processOutput.write((id + "\n").getBytes());
+               
processOutput.write((this.function.getRuntimeContext().getIndexOfThisSubtask() 
+ "\n").getBytes());
                processOutput.write((inputFilePath + "\n").getBytes());
                processOutput.write((outputFilePath + "\n").getBytes());
                processOutput.flush();

http://git-wip-us.apache.org/repos/asf/flink/blob/93c61c09/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py
index 4cb337a..83f563b 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py
@@ -25,13 +25,13 @@ class CoGroupFunction(Function.Function):
         self._keys1 = None
         self._keys2 = None
 
-    def _configure(self, input_file, output_file, port, env, info):
+    def _configure(self, input_file, output_file, port, env, info, 
subtask_index):
         self._connection = 
Connection.TwinBufferingTCPMappedFileConnection(input_file, output_file, port)
         self._iterator = Iterator.Iterator(self._connection, env, 0)
         self._iterator2 = Iterator.Iterator(self._connection, env, 1)
         self._cgiter = Iterator.CoGroupIterator(self._iterator, 
self._iterator2, self._keys1, self._keys2)
         self._collector = Collector.Collector(self._connection, env, info)
-        self.context = RuntimeContext.RuntimeContext(self._iterator, 
self._collector)
+        self.context = RuntimeContext.RuntimeContext(self._iterator, 
self._collector, subtask_index)
         if info.chained_info is not None:
             info.chained_info.operator._configure_chain(self.context, 
self._collector, info.chained_info)
             self._collector = info.chained_info.operator
@@ -53,4 +53,4 @@ class CoGroupFunction(Function.Function):
         collector._close()
 
     def co_group(self, iterator1, iterator2, collector):
-        pass
\ No newline at end of file
+        pass

http://git-wip-us.apache.org/repos/asf/flink/blob/93c61c09/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
index dfe6a28..45a0f2e 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
@@ -32,11 +32,11 @@ class Function(object):
         self.context = None
         self._env = None
 
-    def _configure(self, input_file, output_file, port, env, info):
+    def _configure(self, input_file, output_file, port, env, info, 
subtask_index):
         self._connection = 
Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
         self._iterator = Iterator.Iterator(self._connection, env)
         self._collector = Collector.Collector(self._connection, env, info)
-        self.context = RuntimeContext.RuntimeContext(self._iterator, 
self._collector)
+        self.context = RuntimeContext.RuntimeContext(self._iterator, 
self._collector, subtask_index)
         self._env = env
         if info.chained_info is not None:
             info.chained_info.operator._configure_chain(self.context, 
self._collector, info.chained_info)

http://git-wip-us.apache.org/repos/asf/flink/blob/93c61c09/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
index 8d1934c..77b53a2 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
@@ -25,8 +25,8 @@ class GroupReduceFunction(Function.Function):
     def __init__(self):
         super(GroupReduceFunction, self).__init__()
 
-    def _configure(self, input_file, output_file, port, env, info):
-        super(GroupReduceFunction, self)._configure(input_file, output_file, 
port, env, info)
+    def _configure(self, input_file, output_file, port, env, info, 
subtask_index):
+        super(GroupReduceFunction, self)._configure(input_file, output_file, 
port, env, info, subtask_index)
         if len(info.key1) == 0:
             self._run = self._run_all_group_reduce
         else:

http://git-wip-us.apache.org/repos/asf/flink/blob/93c61c09/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
index b1d2201..08af276 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
@@ -24,8 +24,8 @@ class ReduceFunction(Function.Function):
     def __init__(self):
         super(ReduceFunction, self).__init__()
 
-    def _configure(self, input_file, output_file, port, env, info):
-        super(ReduceFunction, self)._configure(input_file, output_file, port, 
env, info)
+    def _configure(self, input_file, output_file, port, env, info, 
subtask_index):
+        super(ReduceFunction, self)._configure(input_file, output_file, port, 
env, info, subtask_index)
         if len(info.key1) == 0:
             self._run = self._run_all_reduce
         else:

http://git-wip-us.apache.org/repos/asf/flink/blob/93c61c09/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/RuntimeContext.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/RuntimeContext.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/RuntimeContext.py
index 2977eb5..04608d4 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/RuntimeContext.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/RuntimeContext.py
@@ -18,13 +18,17 @@
 
 
 class RuntimeContext(object):
-    def __init__(self, iterator, collector):
+    def __init__(self, iterator, collector, subtask_index):
         self.iterator = iterator
         self.collector = collector
         self.broadcast_variables = dict()
+        self.subtask_id = subtask_index
 
     def _add_broadcast_variable(self, name, var):
         self.broadcast_variables[name] = var
 
     def get_broadcast_variable(self, name):
         return self.broadcast_variables[name]
+
+    def get_index_of_this_subtask(self):
+        return self.subtask_id

http://git-wip-us.apache.org/repos/asf/flink/blob/93c61c09/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
index 698c236..fa83259 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
@@ -15,7 +15,7 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-import copy
+import collections
 import types as TYPES
 
 from flink.plan.Constants import _Identifier, WriteMode, 
_createKeyValueTypeInfo, _createArrayTypeInfo
@@ -572,6 +572,48 @@ class DataSet(object):
         self._info.parallelism.value = parallelism
         return self
 
+    def count_elements_per_partition(self):
+        """
+        Method that goes over all the elements in each partition in order to 
retrieve the total number of elements.
+        :return: A DataSet containing Tuples of subtask index, number of 
elements mappings.
+        """
+        class CountElementsPerPartitionMapper(MapPartitionFunction):
+            def map_partition(self, iterator, collector):
+                counter = 0
+                for x in iterator:
+                    counter += 1
+
+                collector.collect((self.context.get_index_of_this_subtask(), 
counter))
+        return self.map_partition(CountElementsPerPartitionMapper())
+
+    def zip_with_index(self):
+        """
+        Method that assigns a unique Long value to all elements of the 
DataSet. The generated values are consecutive.
+        :return: A DataSet of Tuples consisting of consecutive ids and initial 
values.
+        """
+        element_count = self.count_elements_per_partition()
+        class ZipWithIndexMapper(MapPartitionFunction):
+            start = -1
+
+            def _run(self):
+                offsets = self.context.get_broadcast_variable("counts")
+                offsets = sorted(offsets, key=lambda t: t[0]) # sort by task ID
+                offsets = collections.deque(offsets)
+
+                # compute the offset for each partition
+                for i in range(self.context.get_index_of_this_subtask()):
+                    self.start += offsets[i][1]
+
+                super(ZipWithIndexMapper, self)._run()
+
+            def map_partition(self, iterator, collector):
+                for value in iterator:
+                    self.start += 1
+                    collector.collect((self.start, value))
+        return self\
+            .map_partition(ZipWithIndexMapper())\
+            .with_broadcast_set("counts", element_count)
+
 
 class OperatorSet(DataSet):
     def __init__(self, env, info):

http://git-wip-us.apache.org/repos/asf/flink/blob/93c61c09/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
index 3dbce45..9d08baf 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
@@ -184,6 +184,7 @@ class Environment(object):
                 port = int(sys.stdin.readline().rstrip('\n'))
 
                 id = int(sys.stdin.readline().rstrip('\n'))
+                subtask_index = int(sys.stdin.readline().rstrip('\n'))
                 input_path = sys.stdin.readline().rstrip('\n')
                 output_path = sys.stdin.readline().rstrip('\n')
 
@@ -193,7 +194,7 @@ class Environment(object):
                     if set.id == id:
                         used_set = set
                         operator = set.operator
-                operator._configure(input_path, output_path, port, self, 
used_set)
+                operator._configure(input_path, output_path, port, self, 
used_set, subtask_index)
                 operator._go()
                 operator._close()
                 sys.stdout.flush()

http://git-wip-us.apache.org/repos/asf/flink/blob/93c61c09/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
 
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
index c0a4414..223ff68 100644
--- 
a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
+++ 
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
@@ -42,9 +42,16 @@ if __name__ == "__main__":
 
     d6 = env.from_elements(1, 1, 12)
 
+    d7 = env.generate_sequence(0, 999)
+
     #Generate Sequence Source
-    d7 = env.generate_sequence(1, 5)\
-         .map(Id()).map_partition(Verify([1,2,3,4,5], "Sequence")).output()
+    d7.map(Id()).map_partition(Verify(range(1000), "Sequence")).output()
+
+    #Zip with Index
+    #check that IDs (first field of each element) are consecutive
+    d7.zip_with_index()\
+        .map(lambda x: x[0])\
+        .map_partition(Verify(range(1000), "ZipWithIndex")).output()
 
     #CSV Source/Sink
     csv_data = 
env.read_csv("src/test/python/org/apache/flink/python/api/data_csv", (INT, INT, 
STRING))

Reply via email to