This is an automated email from the ASF dual-hosted git repository. mergebot-role pushed a commit to branch mergebot in repository https://gitbox.apache.org/repos/asf/beam-site.git
commit 33ac606387fc0d68f678c0e43f521e87ec9b9544 Author: melissa <[email protected]> AuthorDate: Wed Aug 23 15:25:47 2017 -0700 [BEAM-1934] Add more CoGroupByKey content/examples --- src/documentation/programming-guide.md | 193 +++++++++++++++++++++++++++------ 1 file changed, 162 insertions(+), 31 deletions(-) diff --git a/src/documentation/programming-guide.md b/src/documentation/programming-guide.md index 2ccbd35..c2f95ac 100644 --- a/src/documentation/programming-guide.md +++ b/src/documentation/programming-guide.md @@ -785,45 +785,176 @@ tree, [2] Thus, `GroupByKey` represents a transform from a multimap (multiple keys to individual values) to a uni-map (unique keys to collections of values). +##### 4.2.2.1 GroupByKey and unbounded PCollections + +If you are using unbounded `PCollection`s, you must use either [non-global +windowing](#setting-your-pcollections-windowing-function) or an +[aggregation trigger](#triggers) in order to perform a `GroupByKey` or +[CoGroupByKey](#cogroupbykey). This is because a bounded `GroupByKey` or +`CoGroupByKey` must wait for all the data with a certain key to be collected, +but with unbounded collections, the data is unlimited. Windowing and/or triggers +allow grouping to operate on logical, finite bundles of data within the +unbounded data streams. + +If you do apply `GroupByKey` or `CoGroupByKey` to a group of unbounded +`PCollection`s without setting either a non-global windowing strategy, a trigger +strategy, or both for each collection, Beam generates an IllegalStateException +error at pipeline construction time. + +When using `GroupByKey` or `CoGroupByKey` to group `PCollection`s that have a +[windowing strategy](#windowing) applied, all of the `PCollection`s you want to +group *must use the same windowing strategy* and window sizing. For example, all +of the collections you are merging must use (hypothetically) identical 5-minute +fixed windows, or 4-minute sliding windows starting every 30 seconds. + +If your pipeline attempts to use `GroupByKey` or `CoGroupByKey` to merge +`PCollection`s with incompatible windows, Beam generates an +IllegalStateException error at pipeline construction time. + #### 4.2.3. CoGroupByKey -`CoGroupByKey` joins two or more key/value `PCollection`s that have the same key -type, and then emits a collection of `KV<K, CoGbkResult>` pairs. [Design Your -Pipeline]({{ site.baseurl }}/documentation/pipelines/design-your-pipeline/#multiple-sources) +`CoGroupByKey` performs a relational join of two or more key/value +`PCollection`s that have the same key type. +[Design Your Pipeline]({{ site.baseurl }}/documentation/pipelines/design-your-pipeline/#multiple-sources) shows an example pipeline that uses a join. -Given the input collections below: +Consider using `CoGroupByKey` if you have multiple data sets that provide +information about related things. For example, let's say you have two different +files with user data: one file has names and email addresses; the other file +has names and phone numbers. You can join those two data sets, using the user +name as a common key and the other data as the associated values. After the +join, you have one data set that contains all of the information (email +addresses and phone numbers) associated with each name. + +If you are using unbounded `PCollection`s, you must use either [non-global +windowing](#setting-your-pcollections-windowing-function) or an +[aggregation trigger](#triggers) in order to perform a `CoGroupByKey`. See +[GroupByKey and unbounded PCollections](#groupbykey-and-unbounded-pcollections) +for more details. + +<span class="language-java"> +In the Beam SDK for Java, `CoGroupByKey` accepts a tuple of keyed +`PCollection`s (`PCollection<KV<K, V>>`) as input. For type safety, the SDK +requires you to pass each `PCollection` as part of a `KeyedPCollectionTuple`. +You must declare a `TupleTag` for each input `PCollection` in the +`KeyedPCollectionTuple` that you want to pass to `CoGroupByKey`. As output, +`CoGroupByKey` returns a `PCollection<KV<K, CoGbkResult>>`, which groups values +from all the input `PCollection`s by their common keys. Each key (all of type +`K`) will have a different `CoGbkResult`, which is a map from `TupleTag<T>` to +`Iterable<T>`. You can access a specific collection in an `CoGbkResult` object +by using the `TupleTag` that you supplied with the initial collection. +</span> +<span class="language-py"> +In the Beam SDK for Python, `CoGroupByKey` accepts a dictionary of keyed +`PCollection`s as input. As output, `CoGroupByKey` creates a single output +`PCollection` that contains one key/value tuple for each key in the input +`PCollection`s. Each key's value is a dictionary that maps each tag to an +iterable of the values under they key in the corresponding `PCollection`. +</span> + +The following conceptual examples use two input collections to show the mechanics of +`CoGroupByKey`. + +<span class="language-java"> +The first set of data has a `TupleTag<String>` called `emailTag` and contains names +and email addresses. The second set of data has a `TupleTag<String>` called +`phoneTag` and contains names and phone numbers. +</span> +<span class="language-py"> +The first set of data contains names and email addresses. The second set of +data contains names and phone numbers. +</span> + +```java +// This set of data has a `TupleTag<String>` called `emailTag`. + "amy" -> "[email protected]" + "carl" -> "[email protected]" + "julia" -> "[email protected]" + "carl" -> "[email protected]" + +// This set of data has a `TupleTag<String>` called `phoneTag`. + "amy" -> "111-222-3333" + "james" -> "222-333-4444" + "amy" -> "333-444-5555" + "carl" -> "444-555-6666" ``` -// collection 1 -user1, address1 -user2, address2 -user3, address3 +```py +{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_group_by_key_cogroupbykey_tuple_inputs +%}``` -// collection 2 -user1, order1 -user1, order2 -user2, order3 -guest, order4 -... +After `CoGroupByKey`, the resulting data contains all data associated with each +unique key from any of the input collections. + +```java + "amy" -> { + emailTag -> ["[email protected]"] + phoneTag -> ["111-222-3333", "333-444-5555"] + } + "carl" -> { + emailTag -> ["[email protected]", "[email protected]"] + phoneTag -> ["444-555-6666"] + } + "james" -> { + emailTag -> [], + phoneTag -> ["222-333-4444"] + } + "julia" -> { + emailTag -> ["[email protected]"], + phoneTag -> [] + } ``` +```py +{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_group_by_key_cogroupbykey_tuple_outputs +%}``` -`CoGroupByKey` gathers up the values with the same key from all `PCollection`s, -and outputs a new pair consisting of the unique key and an object `CoGbkResult` -containing all values that were associated with that key. If you apply -`CoGroupByKey` to the input collections above, the output collection would look -like this: +The following code example joins the two `PCollection`s with `CoGroupByKey`, +followed by a `ParDo` to consume the result. Then, the code uses tags to look up +and format data from each collection. + +```java + // Each set of key-value pairs is read into separate PCollections. + // Each shares a common key ("K"). + PCollection<KV<K, V1>> pt1 = ...; + PCollection<KV<K, V2>> pt2 = ...; + + // Create tuple tags for the value types in each collection. + final TupleTag<V1> t1 = new TupleTag<V1>(); + final TupleTag<V2> t2 = new TupleTag<V2>(); + + // Merge collection values into a CoGbkResult collection + PCollection<KV<K, CoGbkResult>> coGbkResultCollection = + KeyedPCollectionTuple.of(t1, pt1) + .and(t2, pt2) + .apply(CoGroupByKey.<K>create()); + + // Access results and do something with them. + PCollection<T> finalResultCollection = + coGbkResultCollection.apply(ParDo.of( + new DoFn<KV<K, CoGbkResult>, T>() { + @Override + public void processElement(ProcessContext c) { + KV<K, CoGbkResult> e = c.element(); + // Get all collection 1 values + Iterable<V1> pt1Vals = e.getValue().getAll(t1); + // Get all collection 2 values + Iterable<V2> pt2Vals = e.getValue().getAll(t2); + // ... Do something ... + c.output(...some T...); + } + })); ``` -user1, [[address1], [order1, order2]] -user2, [[address2], [order3]] -user3, [[address3], []] -guest, [[], [order4]] -... -```` +```py +{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py tag:model_group_by_key_cogroupbykey_tuple +%}``` -> **A Note on Key/Value Pairs:** Beam represents key/value pairs slightly -> differently depending on the language and SDK you're using. In the Beam SDK -> for Java, you represent a key/value pair with an object of type `KV<K, V>`. In -> Python, you represent key/value pairs with 2-tuples. +The formatted data looks like this: + +```java + Sample coming soon. +``` +```py +{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_group_by_key_cogroupbykey_tuple_formatted_outputs +%}``` #### 4.2.4. Combine @@ -1078,7 +1209,7 @@ PCollection<String> merged = collections.apply(Flatten.<String>pCollections()); ```py # Flatten takes a tuple of PCollection objects. -# Returns a single PCollection that contains all of the elements in the +# Returns a single PCollection that contains all of the elements in the PCollection objects in that tuple. {% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py tag:model_multiple_pcollections_flatten %} @@ -1998,7 +2129,7 @@ windows are not actually used until they're needed for the `GroupByKey`. Subsequent transforms, however, are applied to the result of the `GroupByKey` -- data is grouped by both key and window. -#### 7.1.2. Using windowing with bounded PCollections +#### 7.1.2. Windowing with bounded PCollections You can use windowing with fixed-size data sets in **bounded** `PCollection`s. However, note that windowing considers only the implicit timestamps attached to -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
