[FLINK-2138] [streaming] Added docs and tests for partitioning

Closes #872


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3f3aeb7e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3f3aeb7e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3f3aeb7e

Branch: refs/heads/master
Commit: 3f3aeb7e0a99f2f2af521fa880dc9d11743610f6
Parents: bc8d7c4
Author: Gábor Hermann <[email protected]>
Authored: Wed Jul 1 14:26:33 2015 +0200
Committer: Gyula Fora <[email protected]>
Committed: Sat Jul 11 14:01:16 2015 +0200

----------------------------------------------------------------------
 docs/apis/programming_guide.md                  |  13 ++
 docs/apis/streaming_guide.md                    |  19 ++
 .../flink/streaming/api/PartitionerTest.java    | 229 +++++++++++++++++++
 3 files changed, 261 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3f3aeb7e/docs/apis/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/programming_guide.md b/docs/apis/programming_guide.md
index 17903a9..edf2003 100644
--- a/docs/apis/programming_guide.md
+++ b/docs/apis/programming_guide.md
@@ -958,6 +958,19 @@ val result = in.partitionByHash(0).mapPartition { ... }
     </tr>
     </tr>
     <tr>
+      <td><strong>Custom Partitioning</strong></td>
+      <td>
+        <p>Manually specify a partitioning over the data.
+          <br/>
+          <i>Note</i>: This method works only on single field keys.</p>
+{% highlight scala %}
+val in: DataSet[(Int, String)] = // [...]
+val result = in
+  .partitionCustom(partitioner: Partitioner[K], key)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
       <td><strong>Sort Partition</strong></td>
       <td>
         <p>Locally sorts all partitions of a data set on a specified field in 
a specified order. 

http://git-wip-us.apache.org/repos/asf/flink/blob/3f3aeb7e/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index 7d8ab6d..e337ea8 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -311,6 +311,25 @@ Usage: `dataStream.broadcast()`
  * *Global*: All data points are directed to the first instance of the 
operator. 
 Usage: `dataStream.global()`
 
+Custom partitioning can also be used by giving a Partitioner function and a 
single field key to partition on, similarly to the batch API.
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Tuple2<String,Integer>> in = // [...]
+DataStream<Tuple2<String,Integer>> result =in
+    .partitionCustom(Partitioner<K> partitioner, key)
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+
+{% highlight scala %}
+val in: DataSet[(Int, String)] = // [...]
+val result = in
+    .partitionCustom(partitioner: Partitioner[K], key)
+{% endhighlight %}
+</div>
+</div>
+
 By default *Forward* partitioning is used. 
 
 Partitioning does not remain in effect after a transformation, so it needs to 
be set again for subsequent operations.

http://git-wip-us.apache.org/repos/asf/flink/blob/3f3aeb7e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java
new file mode 100644
index 0000000..c858834
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.TestListResultSink;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.junit.Test;
+
+/**
+ * IT case that tests the different stream partitioning schemes.
+ */
+public class PartitionerTest {
+
+       public static final int PARALLELISM = 3;
+       public static final int MEMORY_SIZE = 32;
+
+       @Test
+       public void partitionerTest() {
+
+               TestListResultSink<Tuple2<Integer, String>> 
hashPartitionResultSink =
+                               new TestListResultSink<Tuple2<Integer, 
String>>();
+               TestListResultSink<Tuple2<Integer, String>> 
customPartitionResultSink =
+                               new TestListResultSink<Tuple2<Integer, 
String>>();
+               TestListResultSink<Tuple2<Integer, String>> 
broadcastPartitionResultSink =
+                               new TestListResultSink<Tuple2<Integer, 
String>>();
+               TestListResultSink<Tuple2<Integer, String>> 
forwardPartitionResultSink =
+                               new TestListResultSink<Tuple2<Integer, 
String>>();
+               TestListResultSink<Tuple2<Integer, String>> 
rebalancePartitionResultSink =
+                               new TestListResultSink<Tuple2<Integer, 
String>>();
+               TestListResultSink<Tuple2<Integer, String>> 
globalPartitionResultSink =
+                               new TestListResultSink<Tuple2<Integer, 
String>>();
+
+
+               StreamExecutionEnvironment env = new 
TestStreamEnvironment(PARALLELISM, MEMORY_SIZE);
+               DataStream<Tuple1<String>> src = env.fromElements(
+                               new Tuple1<String>("a"),
+                               new Tuple1<String>("b"),
+                               new Tuple1<String>("b"),
+                               new Tuple1<String>("a"),
+                               new Tuple1<String>("a"),
+                               new Tuple1<String>("c"),
+                               new Tuple1<String>("a")
+               );
+
+               // partition by hash
+               src
+                               .partitionByHash(0)
+                               .map(new SubtaskIndexAssigner())
+                               .addSink(hashPartitionResultSink);
+
+               // partition custom
+               DataStream<Tuple2<Integer, String>> partitionCustom = src
+                               .partitionCustom(new Partitioner<String>() {
+                                       @Override
+                                       public int partition(String key, int 
numPartitions) {
+                                               if (key.equals("c")) {
+                                                       return 2;
+                                               } else {
+                                                       return 0;
+                                               }
+                                       }
+                               }, 0)
+                               .map(new SubtaskIndexAssigner());
+
+               partitionCustom.addSink(customPartitionResultSink);
+
+               // partition broadcast
+               src.broadcast().map(new 
SubtaskIndexAssigner()).addSink(broadcastPartitionResultSink);
+
+               // partition forward
+               src.map(new 
SubtaskIndexAssigner()).addSink(forwardPartitionResultSink);
+
+               // partition rebalance
+               src.rebalance().map(new 
SubtaskIndexAssigner()).addSink(rebalancePartitionResultSink);
+
+               // partition global
+               src.global().map(new 
SubtaskIndexAssigner()).addSink(globalPartitionResultSink);
+
+               try {
+                       env.execute();
+               } catch (Exception e) {
+                       fail(e.getMessage());
+               }
+
+               List<Tuple2<Integer, String>> hashPartitionResult = 
hashPartitionResultSink.getResult();
+               List<Tuple2<Integer, String>> customPartitionResult = 
customPartitionResultSink.getResult();
+               List<Tuple2<Integer, String>> broadcastPartitionResult = 
broadcastPartitionResultSink.getResult();
+               List<Tuple2<Integer, String>> forwardPartitionResult = 
forwardPartitionResultSink.getResult();
+               List<Tuple2<Integer, String>> rebalancePartitionResult = 
rebalancePartitionResultSink.getResult();
+               List<Tuple2<Integer, String>> globalPartitionResult = 
globalPartitionResultSink.getResult();
+
+               verifyHashPartitioning(hashPartitionResult);
+               verifyCustomPartitioning(customPartitionResult);
+               verifyBroadcastPartitioning(broadcastPartitionResult);
+               verifyRebalancePartitioning(forwardPartitionResult);
+               verifyRebalancePartitioning(rebalancePartitionResult);
+               verifyGlobalPartitioning(globalPartitionResult);
+       }
+
+       private static void verifyHashPartitioning(List<Tuple2<Integer, 
String>> hashPartitionResult) {
+               HashMap<String, Integer> verifier = new HashMap<String, 
Integer>();
+               for (Tuple2<Integer, String> elem : hashPartitionResult) {
+                       Integer subtaskIndex = verifier.get(elem.f1);
+                       if (subtaskIndex == null) {
+                               verifier.put(elem.f1, elem.f0);
+                       } else if (subtaskIndex != elem.f0) {
+                               fail();
+                       }
+               }
+       }
+
+       private static void verifyCustomPartitioning(List<Tuple2<Integer, 
String>> customPartitionResult) {
+               for (Tuple2<Integer, String> stringWithSubtask : 
customPartitionResult) {
+                       if (stringWithSubtask.f1.equals("c")) {
+                               assertEquals(new Integer(2), 
stringWithSubtask.f0);
+                       } else {
+                               assertEquals(new Integer(0), 
stringWithSubtask.f0);
+                       }
+               }
+       }
+
+       private static void verifyBroadcastPartitioning(List<Tuple2<Integer, 
String>> broadcastPartitionResult) {
+               List<Tuple2<Integer, String>> expected = Arrays.asList(
+                               new Tuple2<Integer, String>(0, "a"),
+                               new Tuple2<Integer, String>(0, "b"),
+                               new Tuple2<Integer, String>(0, "b"),
+                               new Tuple2<Integer, String>(0, "a"),
+                               new Tuple2<Integer, String>(0, "a"),
+                               new Tuple2<Integer, String>(0, "c"),
+                               new Tuple2<Integer, String>(0, "a"),
+                               new Tuple2<Integer, String>(1, "a"),
+                               new Tuple2<Integer, String>(1, "b"),
+                               new Tuple2<Integer, String>(1, "b"),
+                               new Tuple2<Integer, String>(1, "a"),
+                               new Tuple2<Integer, String>(1, "a"),
+                               new Tuple2<Integer, String>(1, "c"),
+                               new Tuple2<Integer, String>(1, "a"),
+                               new Tuple2<Integer, String>(2, "a"),
+                               new Tuple2<Integer, String>(2, "b"),
+                               new Tuple2<Integer, String>(2, "b"),
+                               new Tuple2<Integer, String>(2, "a"),
+                               new Tuple2<Integer, String>(2, "a"),
+                               new Tuple2<Integer, String>(2, "c"),
+                               new Tuple2<Integer, String>(2, "a"));
+
+               assertEquals(
+                               new HashSet<Tuple2<Integer, String>>(expected),
+                               new HashSet<Tuple2<Integer, 
String>>(broadcastPartitionResult));
+       }
+
+       private static void verifyRebalancePartitioning(List<Tuple2<Integer, 
String>> rebalancePartitionResult) {
+               List<Tuple2<Integer, String>> expected = Arrays.asList(
+                               new Tuple2<Integer, String>(0, "a"),
+                               new Tuple2<Integer, String>(1, "b"),
+                               new Tuple2<Integer, String>(2, "b"),
+                               new Tuple2<Integer, String>(0, "a"),
+                               new Tuple2<Integer, String>(1, "a"),
+                               new Tuple2<Integer, String>(2, "c"),
+                               new Tuple2<Integer, String>(0, "a"));
+
+               assertEquals(
+                               new HashSet<Tuple2<Integer, String>>(expected),
+                               new HashSet<Tuple2<Integer, 
String>>(rebalancePartitionResult));
+       }
+
+       private static void verifyGlobalPartitioning(List<Tuple2<Integer, 
String>> globalPartitionResult) {
+               List<Tuple2<Integer, String>> expected = Arrays.asList(
+                               new Tuple2<Integer, String>(0, "a"),
+                               new Tuple2<Integer, String>(0, "b"),
+                               new Tuple2<Integer, String>(0, "b"),
+                               new Tuple2<Integer, String>(0, "a"),
+                               new Tuple2<Integer, String>(0, "a"),
+                               new Tuple2<Integer, String>(0, "c"),
+                               new Tuple2<Integer, String>(0, "a"));
+
+               assertEquals(
+                               new HashSet<Tuple2<Integer, String>>(expected),
+                               new HashSet<Tuple2<Integer, 
String>>(globalPartitionResult));
+       }
+
+       private static class SubtaskIndexAssigner
+                       extends RichMapFunction<Tuple1<String>, Tuple2<Integer, 
String>> {
+
+               private int indexOfSubtask;
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       super.open(parameters);
+                       RuntimeContext runtimeContext = getRuntimeContext();
+                       indexOfSubtask = runtimeContext.getIndexOfThisSubtask();
+               }
+
+               @Override
+               public Tuple2<Integer, String> map(Tuple1<String> value) throws 
Exception {
+                       return new Tuple2<Integer, String>(indexOfSubtask, 
value.f0);
+               }
+       }
+}

Reply via email to