[FLINK-2138] [streaming] Added docs and tests for partitioning Closes #872
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3f3aeb7e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3f3aeb7e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3f3aeb7e Branch: refs/heads/master Commit: 3f3aeb7e0a99f2f2af521fa880dc9d11743610f6 Parents: bc8d7c4 Author: Gábor Hermann <[email protected]> Authored: Wed Jul 1 14:26:33 2015 +0200 Committer: Gyula Fora <[email protected]> Committed: Sat Jul 11 14:01:16 2015 +0200 ---------------------------------------------------------------------- docs/apis/programming_guide.md | 13 ++ docs/apis/streaming_guide.md | 19 ++ .../flink/streaming/api/PartitionerTest.java | 229 +++++++++++++++++++ 3 files changed, 261 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3f3aeb7e/docs/apis/programming_guide.md ---------------------------------------------------------------------- diff --git a/docs/apis/programming_guide.md b/docs/apis/programming_guide.md index 17903a9..edf2003 100644 --- a/docs/apis/programming_guide.md +++ b/docs/apis/programming_guide.md @@ -958,6 +958,19 @@ val result = in.partitionByHash(0).mapPartition { ... } </tr> </tr> <tr> + <td><strong>Custom Partitioning</strong></td> + <td> + <p>Manually specify a partitioning over the data. + <br/> + <i>Note</i>: This method works only on single field keys.</p> +{% highlight scala %} +val in: DataSet[(Int, String)] = // [...] +val result = in + .partitionCustom(partitioner: Partitioner[K], key) +{% endhighlight %} + </td> + </tr> + <tr> <td><strong>Sort Partition</strong></td> <td> <p>Locally sorts all partitions of a data set on a specified field in a specified order. http://git-wip-us.apache.org/repos/asf/flink/blob/3f3aeb7e/docs/apis/streaming_guide.md ---------------------------------------------------------------------- diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md index 7d8ab6d..e337ea8 100644 --- a/docs/apis/streaming_guide.md +++ b/docs/apis/streaming_guide.md @@ -311,6 +311,25 @@ Usage: `dataStream.broadcast()` * *Global*: All data points are directed to the first instance of the operator. Usage: `dataStream.global()` +Custom partitioning can also be used by giving a Partitioner function and a single field key to partition on, similarly to the batch API. +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +DataStream<Tuple2<String,Integer>> in = // [...] +DataStream<Tuple2<String,Integer>> result =in + .partitionCustom(Partitioner<K> partitioner, key) +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> + +{% highlight scala %} +val in: DataSet[(Int, String)] = // [...] +val result = in + .partitionCustom(partitioner: Partitioner[K], key) +{% endhighlight %} +</div> +</div> + By default *Forward* partitioning is used. Partitioning does not remain in effect after a transformation, so it needs to be set again for subsequent operations. http://git-wip-us.apache.org/repos/asf/flink/blob/3f3aeb7e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java new file mode 100644 index 0000000..c858834 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.TestListResultSink; +import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.junit.Test; + +/** + * IT case that tests the different stream partitioning schemes. + */ +public class PartitionerTest { + + public static final int PARALLELISM = 3; + public static final int MEMORY_SIZE = 32; + + @Test + public void partitionerTest() { + + TestListResultSink<Tuple2<Integer, String>> hashPartitionResultSink = + new TestListResultSink<Tuple2<Integer, String>>(); + TestListResultSink<Tuple2<Integer, String>> customPartitionResultSink = + new TestListResultSink<Tuple2<Integer, String>>(); + TestListResultSink<Tuple2<Integer, String>> broadcastPartitionResultSink = + new TestListResultSink<Tuple2<Integer, String>>(); + TestListResultSink<Tuple2<Integer, String>> forwardPartitionResultSink = + new TestListResultSink<Tuple2<Integer, String>>(); + TestListResultSink<Tuple2<Integer, String>> rebalancePartitionResultSink = + new TestListResultSink<Tuple2<Integer, String>>(); + TestListResultSink<Tuple2<Integer, String>> globalPartitionResultSink = + new TestListResultSink<Tuple2<Integer, String>>(); + + + StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORY_SIZE); + DataStream<Tuple1<String>> src = env.fromElements( + new Tuple1<String>("a"), + new Tuple1<String>("b"), + new Tuple1<String>("b"), + new Tuple1<String>("a"), + new Tuple1<String>("a"), + new Tuple1<String>("c"), + new Tuple1<String>("a") + ); + + // partition by hash + src + .partitionByHash(0) + .map(new SubtaskIndexAssigner()) + .addSink(hashPartitionResultSink); + + // partition custom + DataStream<Tuple2<Integer, String>> partitionCustom = src + .partitionCustom(new Partitioner<String>() { + @Override + public int partition(String key, int numPartitions) { + if (key.equals("c")) { + return 2; + } else { + return 0; + } + } + }, 0) + .map(new SubtaskIndexAssigner()); + + partitionCustom.addSink(customPartitionResultSink); + + // partition broadcast + src.broadcast().map(new SubtaskIndexAssigner()).addSink(broadcastPartitionResultSink); + + // partition forward + src.map(new SubtaskIndexAssigner()).addSink(forwardPartitionResultSink); + + // partition rebalance + src.rebalance().map(new SubtaskIndexAssigner()).addSink(rebalancePartitionResultSink); + + // partition global + src.global().map(new SubtaskIndexAssigner()).addSink(globalPartitionResultSink); + + try { + env.execute(); + } catch (Exception e) { + fail(e.getMessage()); + } + + List<Tuple2<Integer, String>> hashPartitionResult = hashPartitionResultSink.getResult(); + List<Tuple2<Integer, String>> customPartitionResult = customPartitionResultSink.getResult(); + List<Tuple2<Integer, String>> broadcastPartitionResult = broadcastPartitionResultSink.getResult(); + List<Tuple2<Integer, String>> forwardPartitionResult = forwardPartitionResultSink.getResult(); + List<Tuple2<Integer, String>> rebalancePartitionResult = rebalancePartitionResultSink.getResult(); + List<Tuple2<Integer, String>> globalPartitionResult = globalPartitionResultSink.getResult(); + + verifyHashPartitioning(hashPartitionResult); + verifyCustomPartitioning(customPartitionResult); + verifyBroadcastPartitioning(broadcastPartitionResult); + verifyRebalancePartitioning(forwardPartitionResult); + verifyRebalancePartitioning(rebalancePartitionResult); + verifyGlobalPartitioning(globalPartitionResult); + } + + private static void verifyHashPartitioning(List<Tuple2<Integer, String>> hashPartitionResult) { + HashMap<String, Integer> verifier = new HashMap<String, Integer>(); + for (Tuple2<Integer, String> elem : hashPartitionResult) { + Integer subtaskIndex = verifier.get(elem.f1); + if (subtaskIndex == null) { + verifier.put(elem.f1, elem.f0); + } else if (subtaskIndex != elem.f0) { + fail(); + } + } + } + + private static void verifyCustomPartitioning(List<Tuple2<Integer, String>> customPartitionResult) { + for (Tuple2<Integer, String> stringWithSubtask : customPartitionResult) { + if (stringWithSubtask.f1.equals("c")) { + assertEquals(new Integer(2), stringWithSubtask.f0); + } else { + assertEquals(new Integer(0), stringWithSubtask.f0); + } + } + } + + private static void verifyBroadcastPartitioning(List<Tuple2<Integer, String>> broadcastPartitionResult) { + List<Tuple2<Integer, String>> expected = Arrays.asList( + new Tuple2<Integer, String>(0, "a"), + new Tuple2<Integer, String>(0, "b"), + new Tuple2<Integer, String>(0, "b"), + new Tuple2<Integer, String>(0, "a"), + new Tuple2<Integer, String>(0, "a"), + new Tuple2<Integer, String>(0, "c"), + new Tuple2<Integer, String>(0, "a"), + new Tuple2<Integer, String>(1, "a"), + new Tuple2<Integer, String>(1, "b"), + new Tuple2<Integer, String>(1, "b"), + new Tuple2<Integer, String>(1, "a"), + new Tuple2<Integer, String>(1, "a"), + new Tuple2<Integer, String>(1, "c"), + new Tuple2<Integer, String>(1, "a"), + new Tuple2<Integer, String>(2, "a"), + new Tuple2<Integer, String>(2, "b"), + new Tuple2<Integer, String>(2, "b"), + new Tuple2<Integer, String>(2, "a"), + new Tuple2<Integer, String>(2, "a"), + new Tuple2<Integer, String>(2, "c"), + new Tuple2<Integer, String>(2, "a")); + + assertEquals( + new HashSet<Tuple2<Integer, String>>(expected), + new HashSet<Tuple2<Integer, String>>(broadcastPartitionResult)); + } + + private static void verifyRebalancePartitioning(List<Tuple2<Integer, String>> rebalancePartitionResult) { + List<Tuple2<Integer, String>> expected = Arrays.asList( + new Tuple2<Integer, String>(0, "a"), + new Tuple2<Integer, String>(1, "b"), + new Tuple2<Integer, String>(2, "b"), + new Tuple2<Integer, String>(0, "a"), + new Tuple2<Integer, String>(1, "a"), + new Tuple2<Integer, String>(2, "c"), + new Tuple2<Integer, String>(0, "a")); + + assertEquals( + new HashSet<Tuple2<Integer, String>>(expected), + new HashSet<Tuple2<Integer, String>>(rebalancePartitionResult)); + } + + private static void verifyGlobalPartitioning(List<Tuple2<Integer, String>> globalPartitionResult) { + List<Tuple2<Integer, String>> expected = Arrays.asList( + new Tuple2<Integer, String>(0, "a"), + new Tuple2<Integer, String>(0, "b"), + new Tuple2<Integer, String>(0, "b"), + new Tuple2<Integer, String>(0, "a"), + new Tuple2<Integer, String>(0, "a"), + new Tuple2<Integer, String>(0, "c"), + new Tuple2<Integer, String>(0, "a")); + + assertEquals( + new HashSet<Tuple2<Integer, String>>(expected), + new HashSet<Tuple2<Integer, String>>(globalPartitionResult)); + } + + private static class SubtaskIndexAssigner + extends RichMapFunction<Tuple1<String>, Tuple2<Integer, String>> { + + private int indexOfSubtask; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + RuntimeContext runtimeContext = getRuntimeContext(); + indexOfSubtask = runtimeContext.getIndexOfThisSubtask(); + } + + @Override + public Tuple2<Integer, String> map(Tuple1<String> value) throws Exception { + return new Tuple2<Integer, String>(indexOfSubtask, value.f0); + } + } +}
